diff --git a/README.md b/README.md index 0c4f470..bd1a035 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,8 @@ command: {config,submit,submit-file,create,select,list,upload,download,delete} config configure AWS credentials submit submit your study, dataset or biomaterials metadata (incomplete as all metadata types is not supported yet, expected to be completed on August 2024) - submit-file submit your metadata file containing your cell lines, differentiated cell lines, library preparations and sequencing files + submit-file submit your metadata file with cell lines, differentiated products, library preparations, sequencing files, + and optionally context-specific data (e.g., pooled or unperturbed experiments) create create an upload area (authorised users only) select select or show the active upload area list list contents of the area @@ -59,6 +60,16 @@ Use the tool by specifying a command (`cmd` - see list below) to run, any mandat and `ARG2` - see positional args for each command), and any optional arguments (e.g. `-o1` and `o2` - see options for each command). +### What’s new + +**Automatic clonal-cell-line reuse** – if a clonal cell-line label in your +spreadsheet already exists in the ingest database, `morphic-util` will detect +it and link to the existing record instead of creating a duplicate. + +**Configurable ingest endpoint** – set the environment variable +`INGEST_API_BASE` in `spreadsheet_util.py:parse_cell_lines` (defaults to `https://api.ingest.archive.morphic.bio`) to +target a different ingest deployment without editing code. + ## Commands Help with specific command: @@ -88,10 +99,25 @@ Submit your study and dataset metadata and create your AWS upload area for uploa ```shell script positional arguments: -$ morphic-util submit --type --file - - --type type of metadata being submitted (e.g. study or dataset) - --file path to the file containing the metadata +$ morphic-util submit --type --file [--study ] --dataset-type [--derived-from ] + + Required: + --type: type of metadata being submitted (e.g. study or dataset) + --file: path to the file containing the metadata + + Required for datasets: + --dataset-type: Dataset type (e.g., raw, filtered, processed, analysis) + + Conditionally required for datasets: + --derived-from: Comma-separated list of dataset IDs this dataset is derived from + + Optional (for datasets): + --study: Link the dataset to an existing study + + Validation rules (for datasets): + raw: Must not include --derived-from + filtered, processed: Must be derived from a raw dataset + analysis: Must be derived from a processed dataset ``` ## `submit-file` command @@ -99,11 +125,22 @@ Submit your study and dataset metadata and create your AWS upload area for uploa ```shell script positional arguments: -$ morphic-util submit-file --file --action --dataset +$ morphic-util submit-file --file --action --dataset [--context ] +positional arguments: --file path to the file containing the metadata --action ADD, MODIFY or DELETE based on the type of submission --dataset the identifier for the analysis + +optional arguments: + --context optional ingestion context, e.g.: + 'pooled_differentiated' → for MSK pooled datasets + 'unperturbed_multiple' → for UCSF datasets + If omitted, legacy behavior is used +``` +Example usage: +```shell script +morphic-util submit-file --file my_file.xlsx --action ADD --dataset 67f8519e68005a3744c40fcf --context pooled_differentiated ``` ## `create` command @@ -205,11 +242,17 @@ $ morphic-util submit --type study --file ### Create your dataset and link it to your study ```shell script positional arguments: -$ morphic-util submit --type dataset --file --study - +$ morphic-util submit --type dataset --file [--study ] [--dataset-type ] [--derived-from ] --type type of metadata being submitted (here it is dataset) --file path to the file containing the metadata (optional) --study STUDY_ID obtained in the last step + --dataset-type: One of raw, filtered, processed, or analysis (required) + --derived-from: Comma-separated list of dataset IDs this dataset is derived from (required for all except raw) + + Validation rules: + raw: Must not have --derived-from + filtered or processed: Must be derived from raw + analysis: Must be derived from one or more processed datasets ``` ### `select` your upload area to upload your data files (the upload area name is same as your DATASET_ID) Show or select the data file upload area diff --git a/ait/commons/util/__main__.py b/ait/commons/util/__main__.py index 987197b..5d96dc0 100755 --- a/ait/commons/util/__main__.py +++ b/ait/commons/util/__main__.py @@ -79,17 +79,27 @@ def parse_args(args): parser_config.add_argument('PASSWORD', help='AWS Cognito password', nargs='?') parser_config.add_argument('--bucket', help='use BUCKET instead of default bucket') - parser_config = cmd_parser.add_parser('submit', help='submit your metadata') - parser_config.add_argument('--type', help='data type you are submitting, e.g. study, dataset') - parser_config.add_argument('--file', help='your metadata') - parser_config.add_argument('--study', help='your study reference') - parser_config.add_argument('--dataset', help='your dataset reference') - parser_config.add_argument('--process', help='your process/analysis reference') + parser_submit = cmd_parser.add_parser('submit', help='submit your metadata') + parser_submit.add_argument('--type', required=True, choices=['study', 'dataset'], help='data type you are submitting') + parser_submit.add_argument('--file', required=True, help='your metadata file path') + parser_submit.add_argument('--study', help='your study reference') + parser_submit.add_argument('--dataset', help='your dataset reference') + parser_submit.add_argument('--process', help='your process/analysis reference') + parser_submit.add_argument('--dataset-type', choices=['raw', 'processed', 'filtered', 'analysis'], + help='dataset type (required if --type=dataset)') + parser_submit.add_argument('--derived-from', help='Comma-separated dataset IDs this dataset is derived from') parser_config = cmd_parser.add_parser('submit-file', help='submit your file containing your dataset metadata') parser_config.add_argument('--file', help='spreadsheet containing your dataset metadata') parser_config.add_argument('--action', help='action you want to perform (ADD/MODIFY/DELETE') parser_config.add_argument('--dataset', help='your dataset reference') + parser_config.add_argument( + '--context', + help="Optional context for ingestion (e.g. 'pooled_differentiated' for MSK pooled mode or " + "'unperturbed_multiple' for UCSF mode)." + "If omitted, legacy behavior is used.", + default=None + ) parser_config = cmd_parser.add_parser('view', help='view your dataset') parser_config.add_argument('--dataset', help='your dataset reference') @@ -113,7 +123,8 @@ def parse_args(args): # parser_clear.add_argument('-a', action='store_true', help='clear all - selection and known dirs') parser_list = cmd_parser.add_parser('list', help='list contents of the area') - parser_list.add_argument('-b', action='store_true', help='list all areas in the S3 bucket (authorised users only)') + parser_list.add_argument('-processing', action='store_true', help='access the processed data (authorised users ' + 'only)') # parser_upload = cmd_parser.add_parser('upload', help='upload files to the area') # group_upload = parser_upload.add_mutually_exclusive_group(required=True) @@ -143,7 +154,8 @@ def parse_args(args): group_delete.add_argument('-d', action='store_true', help='delete upload area and contents (authorised users only)') parser_sync = cmd_parser.add_parser('sync', - help='copy data from selected upload area to ingest upload area (authorised users only)') + help='copy data from selected upload area to ingest upload area (authorised ' + 'users only)') parser_sync.add_argument('INGEST_UPLOAD_AREA', help='Ingest upload area', type=valid_ingest_upload_area) ps = [parser] @@ -170,6 +182,24 @@ def parse_args(args): def main(): try: parsed_args = parse_args(sys.argv[1:]) + + if parsed_args.command == 'submit' and parsed_args.type == 'dataset': + if not parsed_args.dataset_type: + print("Error: --dataset-type is required when submitting a dataset", file=sys.stderr) + sys.exit(1) + + if parsed_args.dataset_type == 'raw' and parsed_args.derived_from: + print("Error: --derived-from is not allowed for 'raw' datasets", file=sys.stderr) + sys.exit(1) + + if parsed_args.dataset_type in ['processed', 'filtered'] and not parsed_args.derived_from: + print("Error: --derived-from is required for 'processed' or 'filtered' datasets", file=sys.stderr) + sys.exit(1) + + if parsed_args.dataset_type == 'analysis' and not parsed_args.derived_from: + print("Error: --derived-from is required for 'analysis' datasets", file=sys.stderr) + sys.exit(1) + Cmd(parsed_args) except KeyboardInterrupt: # If SIGINT is triggered whilst threads are active (upload/download) we kill the entire process to give the diff --git a/ait/commons/util/command/list.py b/ait/commons/util/command/list.py index ef5261e..cf79917 100644 --- a/ait/commons/util/command/list.py +++ b/ait/commons/util/command/list.py @@ -1,5 +1,10 @@ +import hashlib +import csv + from ait.commons.util.common import format_err from ait.commons.util.local_state import get_selected_area +from ait.commons.util.user_profile import get_profile +from urllib.parse import urlparse def print_area(k, area): @@ -20,6 +25,34 @@ def print_area(k, area): print() +def get_s3_path(): + while True: + s3_path = input("Enter the S3 path (e.g., s3://bucket-name/folder/): ").strip() + parsed_url = urlparse(s3_path) + + if parsed_url.scheme == 's3' and parsed_url.netloc: + return s3_path + else: + print("Invalid S3 path. Please enter a valid S3 path starting with 's3://'.") + + +def calculate_md5(s3_client, bucket_name, key): + md5_hash = hashlib.md5() + + try: + # Stream the object in chunks + response = s3_client.get_object(Bucket=bucket_name, Key=key) + + for chunk in response['Body'].iter_chunks(chunk_size=8192): + md5_hash.update(chunk) + + return md5_hash.hexdigest() + except Exception as e: + print(f"Failed to compute MD5 for {key}: {e}") + + return None + + class CmdList: """ admin and user @@ -29,22 +62,67 @@ class CmdList: def __init__(self, aws, args): self.aws = aws self.args = args + self.user = get_profile('morphic-util').username + self.processing = getattr(self.args, 'processing', None) self.s3_cli = self.aws.common_session.client('s3') def run(self): - selected_area = get_selected_area() # select area is a S3 bucket + if self.processing: + if self.user != 'morphic-admin': + return False, "Admin function only" + else: + print("Access granted") + + s3_path = get_s3_path() + self.list_s3_files(s3_path) + + return True, None + + else: + selected_area = get_selected_area() # select area is a S3 bucket + + if not selected_area: + return False, 'No area selected' + + try: + self.list_bucket_contents(selected_area) + # print_count(folder_count + files_count) + return True, None + + except Exception as e: + return False, format_err(e, 'list') + + def list_s3_files(self, s3_path): + parsed_url = urlparse(s3_path) + bucket_name = parsed_url.netloc + prefix = parsed_url.path.lstrip('/') + output_file = 's3_file_md5s.tsv' + + with open(output_file, 'w', newline='') as csvfile: + tsv_writer = csv.writer(csvfile, delimiter=',') + tsv_writer.writerow(['File Name', 'MD5 Hash']) # Write header row + + try: + response = self.s3_cli.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + + if 'Contents' in response: + print(f"\nFiles in '{s3_path}'") - if not selected_area: - return False, 'No area selected' + for obj in response['Contents']: + file_key = obj['Key'] + if not file_key.endswith('/'): # Skip folders + md5_hash = calculate_md5(self.s3_cli, bucket_name, file_key) - try: - self.list_bucket_contents(selected_area) - # print_count(folder_count + files_count) - return True, None + if md5_hash: + print(f"{file_key} - MD5: {md5_hash}") + tsv_writer.writerow([file_key, md5_hash]) # Write to file + else: + print("\nNo files found.") + except Exception as e: + print(f"\nError: {e}") - except Exception as e: - return False, format_err(e, 'list') + print(f"\nResults saved to {output_file}") def list_bucket_contents(self, selected_area, prefix=''): result = self.s3_cli.list_objects_v2(Bucket=selected_area, Delimiter='/', Prefix=prefix) diff --git a/ait/commons/util/command/submit.py b/ait/commons/util/command/submit.py index ad17000..5f390d7 100644 --- a/ait/commons/util/command/submit.py +++ b/ait/commons/util/command/submit.py @@ -2,6 +2,7 @@ import traceback import requests +from requests.exceptions import HTTPError import json import pandas as pd import numpy as np @@ -9,8 +10,9 @@ from ait.commons.util.spreadsheet_util import SubmissionError from ait.commons.util.user_profile import get_profile -from ait.commons.util.provider_api_util import APIProvider +from ait.commons.util.provider_api_util import ProviderApi +import time def matching_expression_alteration_and_cell_line(cell_line, expression_alteration): return expression_alteration.expression_alteration_id.replace(" ", @@ -191,7 +193,7 @@ class CmdSubmit: transform(file): Transforms the input file to a JSON object. put_to_provider_api(url, access_token): Sends a PUT request to the provider API. """ - BASE_URL = 'http://localhost:8080' + BASE_URL = 'https://api.ingest.archive.morphic.bio/' SUBMISSION_ENVELOPE_CREATE_URL = f"{BASE_URL}/submissionEnvelopes/updateSubmissions" SUBMISSION_ENVELOPE_BASE_URL = f"{BASE_URL}/submissionEnvelopes" @@ -206,7 +208,9 @@ def __init__(self, args): self.access_token = get_profile('morphic-util').access_token self.type = getattr(self.args, 'type', None) self.file = getattr(self.args, 'file', None) - self.provider_api = APIProvider(self.BASE_URL) + self.dataset_type = getattr(self.args, 'dataset_type', None) + self.derived_from = getattr(self.args, 'derived_from', None) + self.provider_api = ProviderApi(self.BASE_URL) def run(self): """ @@ -240,13 +244,34 @@ def handle_cell_line(self, Returns: - cell_line_entity_id: Entity ID of the submitted or modified cell line biomaterial. """ + if cell_line.id and action.lower() != 'modify': + print(f"Re-using existing clonal cell line " + f"'{cell_line.biomaterial_id}' ({cell_line.id})") + + # keep the spreadsheet in sync + update_dataframe(cell_lines_df, + cell_line.id, + cell_line.biomaterial_id, + 'clonal_cell_line.label') + + # make sure the biomaterial is linked to the current dataset + self.link_to_dataset('biomaterial', dataset_id, cell_line.id, access_token) + + # (re-)link to its expression-alteration process if necessary + if expression_alterations: + self.link_cell_line_with_expression_alterations( + access_token, cell_line, cell_line.id, expression_alterations + ) + return cell_line.id + + if action.lower() == 'modify': try: success = self.patch_entity('biomaterial', cell_line.id, cell_line.to_dict(), access_token) if success: print(f"Updated cell line: {cell_line.id} / {cell_line.biomaterial_id}") update_dataframe(cell_lines_df, cell_line.id, cell_line.biomaterial_id, - 'cell_line.biomaterial_core.biomaterial_id') + 'clonal_cell_line.label') return cell_line.id else: errors.append(f"Failed to update cell line: {cell_line.id} / {cell_line.biomaterial_id}") @@ -259,7 +284,7 @@ def handle_cell_line(self, cell_line_entity_id = self.create_cell_line_entity(cell_line, expression_alterations, submission_envelope_id, dataset_id, access_token) update_dataframe(cell_lines_df, cell_line_entity_id, cell_line.biomaterial_id, - 'cell_line.biomaterial_core.biomaterial_id') + 'clonal_cell_line.label') return cell_line_entity_id except Exception as e: errors.append(f"Failed to create cell line: {cell_line.biomaterial_id}") @@ -323,6 +348,7 @@ def handle_differentiated_cell_line(self, cell_line_entity_id, differentiated_cell_line, differentiated_cell_lines_df, + differentiated, submission_envelope_id, dataset_id, access_token, @@ -350,9 +376,15 @@ def handle_differentiated_cell_line(self, print(f"Updated differentiated cell line: {differentiated_cell_line.id} / " f"{differentiated_cell_line.biomaterial_id}") - update_dataframe(differentiated_cell_lines_df, differentiated_cell_line.id, - differentiated_cell_line.biomaterial_id, - 'differentiated_cell_line.biomaterial_core.biomaterial_id') + if differentiated: + update_dataframe(differentiated_cell_lines_df, differentiated_cell_line.id, + differentiated_cell_line.biomaterial_id, + 'differentiated_product.label') + else: + update_dataframe(differentiated_cell_lines_df, differentiated_cell_line.id, + differentiated_cell_line.biomaterial_id, + 'undifferentiated_product.label') + return differentiated_cell_line.id else: errors.append(f"Failed to update differentiated cell line: {differentiated_cell_line.id} / " @@ -370,12 +402,19 @@ def handle_differentiated_cell_line(self, dataset_id, differentiated_cell_line, submission_envelope_id) - update_dataframe(differentiated_cell_lines_df, differentiated_cell_line_id, - differentiated_cell_line.biomaterial_id, - 'differentiated_cell_line.biomaterial_core.biomaterial_id') + + if differentiated: + update_dataframe(differentiated_cell_lines_df, differentiated_cell_line_id, + differentiated_cell_line.biomaterial_id, + 'differentiated_product.label') + else: + update_dataframe(differentiated_cell_lines_df, differentiated_cell_line_id, + differentiated_cell_line.biomaterial_id, + 'undifferentiated_product.label') return differentiated_cell_line_id except Exception as e: - errors.append(f"Failed to create differentiated cell line: {differentiated_cell_line.biomaterial_id}") + errors.append( + f"Failed to create differentiated/undifferentiated cell line: {differentiated_cell_line.biomaterial_id}") raise SubmissionError(errors, e) def create_differentiated_cell_line_entity(self, @@ -552,7 +591,7 @@ def handle_library_preparation(self, update_dataframe(library_preparations_df, library_preparation.id, library_preparation.biomaterial_id, - 'library_preparation.biomaterial_core.biomaterial_id') + 'library_preparation.label') return library_preparation.id else: errors.append(f"Failed to update library preparation biomaterial: {library_preparation.id} / " @@ -570,7 +609,7 @@ def handle_library_preparation(self, submission_envelope_id) update_dataframe(library_preparations_df, library_preparation_entity_id, library_preparation.biomaterial_id, - 'library_preparation.biomaterial_core.biomaterial_id') + 'library_preparation.label') return library_preparation_entity_id except Exception as e: @@ -743,7 +782,7 @@ def handle_sequencing_file(self, library_preparation_entity_id, sequencing_file, update_dataframe(sequencing_file_df, sequencing_file.id, sequencing_file.file_name, - 'sequence_file.file_core.file_name') + 'sequence_file.label') return sequencing_file.id else: errors.append( @@ -761,7 +800,7 @@ def handle_sequencing_file(self, library_preparation_entity_id, sequencing_file, submission_envelope_id) update_dataframe(sequencing_file_df, sequencing_file_entity_id, sequencing_file.file_name, - 'sequence_file.file_core.file_name') + 'sequence_file.label') return sequencing_file_entity_id except Exception as e: @@ -895,11 +934,125 @@ def create_process(self, access_token, dataset_id, process_data, submission_enve return process_entity_id + def _link_cell_lines_to_children(self, cell_lines, child_lines, dataset_id, submission_envelope_id, access_token, action, errors): + """ + Link each cell line to its corresponding differentiated/undifferentiated children. + Uses the 'input_biomaterial_id' attribute if present; otherwise, falls back to 'cell_line_biomaterial_id'. + """ + print("Linking cell lines with their differentiated/undifferentiated children.") + for cl in cell_lines: + for child in child_lines: + # Use 'input_biomaterial_id' if available; otherwise, use 'cell_line_biomaterial_id' + child_id = getattr(child, "input_biomaterial_id", None) or child.cell_line_biomaterial_id + if cl.biomaterial_id == child_id: + print(f"Linking cell line {cl.biomaterial_id} to child {child.biomaterial_id}.") + self.link_cell_line_and_differentiated_cell_line( + access_token, + cl, + child, + dataset_id, + submission_envelope_id, + action, + errors + ) + + def _process_library_preparations(self, cell_lines, diff_lines, library_preps, dataset_id, submission_envelope_id, access_token, action, errors, context): + """ + Process library preparations and link them to cell lines using different logic based on context. + + If context=="unperturbed_multiple": + - For each target in the library preparation (which is ensured to be a list), + check first for clones (cell lines with a non-null clone_id) and link via + link_clone_to_library_preparation_process. + - If no clone is found, then check for a matching differentiated cell line + and link via link_differentiated_and_library_preparation. + + Otherwise, use legacy exact matching. + """ + print("Processing library preparations for linking.") + for lp in library_preps: + targets = lp.differentiated_biomaterial_id + if not isinstance(targets, list): + targets = [targets] + if context == "unperturbed_multiple": + # New behavior: try matching clones first, then differentiated cell lines. + for target in targets: + linked = False + # Check among clones (cell lines with non-null clone_id) + for cl in cell_lines: + if cl.clone_id is not None and cl.biomaterial_id == target: + print(f"LP {lp.biomaterial_id}: target {target} matches clone {cl.biomaterial_id}.") + self.link_clone_to_library_preparation_process( + cl, + lp, + dataset_id, + submission_envelope_id, + access_token, + action, + errors + ) + linked = True + # If no clone match found, check among differentiated/parental cell lines. + if not linked: + for diff in diff_lines: + if diff.biomaterial_id == target: + print(f"LP {lp.biomaterial_id}: target {target} matches differentiated cell line {diff.biomaterial_id}.") + self.link_differentiated_and_library_preparation( + access_token, + diff, + lp, + dataset_id, + submission_envelope_id, + action, + errors + ) + linked = True + break + if not linked: + err_msg = f"LP {lp.biomaterial_id}: target ID {target} not found among cell lines." + print(err_msg) + errors.append(err_msg) + else: + # Legacy behavior (e.g. for MSK/JAX): exact matching. + for target in targets: + for diff in diff_lines: + if diff.biomaterial_id == target: + print(f"(Legacy) LP {lp.biomaterial_id}: target {target} matches differentiated cell line {diff.biomaterial_id}.") + self.link_differentiated_and_library_preparation( + access_token, + diff, + lp, + dataset_id, + submission_envelope_id, + action, + errors + ) + + def _link_sequencing_files(self, library_preps, sequencing_files, dataset_id, submission_envelope_id, access_token, action, errors): + """ + Link each sequencing file with its corresponding library preparation. + """ + print("Linking sequencing files to library preparations.") + for lp in library_preps: + for sf in sequencing_files: + # Match using the (updated) library preparation biomaterial ID + if lp.biomaterial_id == sf.library_preparation_id: + print(f"Linking sequencing file {sf.file_name} with LP {lp.biomaterial_id}.") + self.link_library_preparation_and_sequencing_file( + access_token, + lp, + sf, + dataset_id, + submission_envelope_id, + action, + errors + ) + def establish_links(self, cell_lines, cell_lines_df, - differentiated_or_undifferentiated_cell_lines, - differentiated_or_undifferentiated_cell_lines_df, + diff_or_undiff_cell_lines, + diff_or_undiff_cell_lines_df, library_preparations, library_preparations_df, sequencing_files, @@ -908,7 +1061,8 @@ def establish_links(self, dataset_id, access_token, action, - errors): + errors, + context=None): """ Handles the submission of multiple types of biomaterials (cell lines, differentiated cell lines, library preparations) @@ -923,58 +1077,33 @@ def establish_links(self, - submission_envelope_id: ID of the submission envelope where entities will be linked. - access_token: Access token for authentication and authorization. + The linking behavior for library preparations depends on the 'context' parameter: + - If context is "unperturbed_multiple", the new behavior is used. + - Otherwise, legacy behavior (exact matching) is applied. + Returns: - Tuple containing updated DataFrames and a status message. """ + print("Starting establish_links process.") try: - for cell_line in cell_lines: - for differentiated_or_undifferentiated_cell_line in differentiated_or_undifferentiated_cell_lines: - if cell_line.biomaterial_id == differentiated_or_undifferentiated_cell_line.input_biomaterial_id: - self.link_cell_line_and_differentiated_cell_line(access_token, - cell_line, - differentiated_or_undifferentiated_cell_line, - dataset_id, - submission_envelope_id, - action, - errors) - for differentiated_or_undifferentiated_cell_line in differentiated_or_undifferentiated_cell_lines: - for library_preparation in library_preparations: - if differentiated_or_undifferentiated_cell_line.biomaterial_id == library_preparation.differentiated_biomaterial_id: - self.link_differentiated_and_library_preparation( - access_token, - differentiated_or_undifferentiated_cell_line, - library_preparation, - dataset_id, - submission_envelope_id, - action, - errors) - - for library_preparation in library_preparations: - for sequencing_file in sequencing_files: - if library_preparation.biomaterial_id == sequencing_file.library_preparation_id: - self.link_library_preparation_and_sequencing_file(access_token, - library_preparation, - sequencing_file, - dataset_id, - submission_envelope_id, - action, - errors) + # 1. Link cell lines with their differentiated/undifferentiated children. + self._link_cell_lines_to_children(cell_lines, diff_or_undiff_cell_lines, dataset_id, submission_envelope_id, access_token, action, errors) + + # 2. Process library preparations based on the provided context. + self._process_library_preparations(cell_lines, diff_or_undiff_cell_lines, library_preparations, dataset_id, submission_envelope_id, access_token, action, errors, context) + + # 3. Link sequencing files to library preparations. + self._link_sequencing_files(library_preparations, sequencing_files, dataset_id, submission_envelope_id, access_token, action, errors) message = 'SUCCESS' + print("establish_links completed successfully.") except Exception as e: message = f"An error occurred: {str(e)}" errors.append(message) + print(message) raise SubmissionError(message, e) - # Set DataFrames to None in case of an error - # cell_lines_df = None - # differentiated_cell_lines_df = None - # library_preparations_df = None - # sequencing_files_df = None - return ([cell_lines_df, - differentiated_or_undifferentiated_cell_lines_df, - library_preparations_df, - sequencing_files_df], message) + return ([cell_lines_df, diff_or_undiff_cell_lines_df, library_preparations_df, sequencing_files_df], message) def typed_submission(self, type, file, access_token): """ @@ -1004,6 +1133,34 @@ def typed_submission(self, type, file, access_token): if link_to_study == 'yes': study_id = input("Input study id: ").lower() self.link_dataset_to_study(entity_id, study_id, access_token) + + if self.dataset_type: + print(f"Assigning dataset type '{self.dataset_type}' to dataset ID '{entity_id}'...") + self.provider_api.patch( + f"{self.BASE_URL}/datasets/{entity_id}", + access_token, + {"datasetType": self.dataset_type} + ) + print(f"Dataset '{entity_id}' successfully marked as type '{self.dataset_type}'.") + + # Optional: wait briefly or re-fetch to avoid version mismatch + time.sleep(0.2) + + # Validate and link derivedFrom + if self.derived_from: + derived_ids = [d.strip() for d in self.derived_from.split(",") if d.strip()] + self._validate_dataset_type_and_lineage( + entity_id, self.dataset_type, derived_ids, access_token + ) + print(f"Establishing data lineage: '{entity_id}' is derived from → {derived_ids}") + for source_id in derived_ids: + print(f" ↳ Linking '{entity_id}' ← derived from ← '{source_id}'...") + self.provider_api._put_with_retry( + f"{self.BASE_URL}/datasets/{entity_id}/derivedFrom/{source_id}", + access_token + ) + print(f"Lineage successfully established for dataset '{entity_id}'.") + elif type == 'biomaterial': if self.args.dataset is not None: dataset_id = self.args.dataset @@ -1026,6 +1183,65 @@ def typed_submission(self, type, file, access_token): print("Unsupported type") return False, "Unsupported type" + def _validate_dataset_type_and_lineage(self, dataset_id, dataset_type, derived_ids, access_token): + if not derived_ids: + if dataset_type in ['filtered', 'processed', 'analysis']: + raise SubmissionError([ + f"{dataset_type.capitalize()} datasets must be derived from other datasets." + ]) + return + + if dataset_type == "raw": + raise SubmissionError(["Raw datasets cannot be derived from other datasets."]) + + expected_parent_type = { + 'filtered': 'raw', + 'processed': 'raw', + 'analysis': 'processed' + }.get(dataset_type) + + if not expected_parent_type: + return + + for source_id in derived_ids: + source_id = source_id.strip() + if source_id: + try: + dataset_info = self.provider_api.get( + f"{self.BASE_URL}/datasets/{source_id}", + access_token + ) + parent_type = dataset_info.get("datasetType") + if parent_type != expected_parent_type: + raise SubmissionError([ + f"\nDataset was created (ID: {dataset_id}), but derived-from validation failed.", + f"{dataset_type.capitalize()} datasets must be derived from {expected_parent_type} datasets. " + f"Found parent {source_id} of type {parent_type or 'unknown'}." + ]) + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + raise SubmissionError([f"Parent dataset '{source_id}' not found. Double-check the ID."]) + else: + raise SubmissionError([f"Failed to validate parent dataset {source_id}: {str(e)}"]) + + def _put_with_retry(self, url, access_token, retries=3, delay=0.3): + for attempt in range(retries): + try: + response = self.provider_api.put(url, access_token) + if response.status_code // 100 == 2: + return True + elif response.status_code == 409: + print(f"Conflict detected. Retrying... ({attempt+1}/{retries})") + time.sleep(delay) + else: + response.raise_for_status() + except Exception as e: + if attempt == retries - 1: + print(f"PUT failed: {url} — {str(e)}") + raise + time.sleep(delay) + return False + def create_new_envelope_and_submit_entity(self, input_entity_type, data, access_token): """ Creates and submits a new entity (study, dataset, biomaterial, or process) and returns its ID. @@ -1163,19 +1379,26 @@ def link_biomaterial_to_dataset(self, biomaterial_id, dataset_id, access_token): print(f"Biomaterial linked successfully to dataset: {dataset_id}") + import time + def link_biomaterial_to_process(self, biomaterial_id, process_id, access_token): """ - Links a biomaterial to a process. - - Parameters: - biomaterial_id (str): The ID of the biomaterial. - process_id (str): The ID of the process. - access_token (str): Access token for authorization. + Links a biomaterial to a process with retry logic on 409 Conflict. """ print(f"Linking biomaterial {biomaterial_id} to process {process_id}") - url = f"{self.BASE_URL}/biomaterials/{biomaterial_id}/inputToProcesses" - self.perform_hal_linkage(url, process_id, 'processes', access_token) + + for attempt in range(3): + try: + self.perform_hal_linkage(url, process_id, 'processes', access_token) + return # success + except requests.exceptions.HTTPError as e: + if e.response.status_code == 409: + print(f"Conflict (409) when linking biomaterial to process. Retrying attempt {attempt + 1}/3...") + time.sleep(0.5) + else: + raise # rethrow for anything else + raise RuntimeError(f"Failed to link biomaterial {biomaterial_id} to process {process_id} after retries.") def delete_submission(self, submission_envelope_id, access_token, force_delete=False): """ @@ -1222,8 +1445,11 @@ def perform_hal_linkage(self, url, input_id, link_to, access_token): response = requests.post(url, headers=headers, data=f"{self.BASE_URL}/{link_to}/{input_id}") if response.status_code != 200: - raise Exception(f"Failed to link biomaterial to process {input_id}. " - f"Status code: {response.status_code}, Response: {response.text}") + # Raise with response attached for retry logic to inspect + http_error = HTTPError(f"Failed to link biomaterial to process {input_id}. " + f"Status code: {response.status_code}, Response: {response.text}") + http_error.response = response + raise http_error else: print("Linkage successful") @@ -1281,5 +1507,46 @@ def delete_dataset(self, dataset, access_token): print(f"Deleting {data_file}") self.provider_api.delete(f"{self.BASE_URL}/files/{data_file}", access_token) - print(f"\nDeleting the dataset: {dataset}") - self.provider_api.delete(f"{self.BASE_URL}/datasets/{dataset}", access_token) + # print(f"\nDeleting the dataset: {dataset}") + # self.provider_api.delete(f"{self.BASE_URL}/datasets/{dataset}", access_token) + + def link_clone_to_library_preparation_process(self, cell_line, library_preparation, dataset_id, + submission_envelope_id, access_token, action, errors): + """ + For a clonal cell line (one with a non-null clone_id), this method creates a library preparation process, + then links the clone as input and the existing library preparation biomaterial as derived by the process. + This function only makes the two necessary HAL linkage calls (inputToProcesses and derivedByProcesses) + without creating additional child/parent biomaterial links. + + Returns: + process_entity_id (str): The ID of the created library preparation process. + """ + import logging + logging.debug( + f"Starting LP process linking for clone {cell_line.biomaterial_id} and LP biomaterial {library_preparation.id}") + try: + # Create the library preparation process. + process_entity_id = self.create_process( + access_token, + dataset_id, + get_process_content('library_preparation'), + submission_envelope_id + ) + logging.debug(f"Library preparation process created: {process_entity_id}") + + # Link the clone as input to the process. + input_url = f"{self.BASE_URL}/biomaterials/{cell_line.id}/inputToProcesses" + self.perform_hal_linkage(input_url, process_entity_id, 'processes', access_token) + logging.debug(f"Linked clone {cell_line.biomaterial_id} as input to process {process_entity_id}") + + # Link the existing LP biomaterial as derived by the process. + derived_url = f"{self.BASE_URL}/biomaterials/{library_preparation.id}/derivedByProcesses" + self.perform_hal_linkage(derived_url, process_entity_id, 'processes', access_token) + logging.debug(f"Linked LP biomaterial {library_preparation.id} as derived by process {process_entity_id}") + + return process_entity_id + except Exception as e: + error_msg = f"Failed to link clone {cell_line.biomaterial_id} to LP process: {e}" + logging.error(error_msg) + errors.append(error_msg) + raise SubmissionError(errors, e) diff --git a/ait/commons/util/command/submit_file.py b/ait/commons/util/command/submit_file.py index 2ac893f..04efb9b 100644 --- a/ait/commons/util/command/submit_file.py +++ b/ait/commons/util/command/submit_file.py @@ -10,10 +10,10 @@ from ait.commons.util.command.submit import CmdSubmit, get_entity_id_from_hal_link, create_new_submission_envelope from ait.commons.util.command.upload import CmdUpload from ait.commons.util.user_profile import get_profile -from ait.commons.util.provider_api_util import APIProvider +from ait.commons.util.provider_api_util import ProviderApi from ait.commons.util.spreadsheet_util import SpreadsheetSubmitter, ValidationError, \ merge_library_preparation_sequencing_file, merge_cell_line_and_differentiated_cell_line, \ - merge_differentiated_cell_line_and_library_preparation, SubmissionError + merge_differentiated_cell_line_and_library_preparation, SubmissionError, process_library_preparations # Define a class for handling submission of a command file @@ -65,7 +65,7 @@ def _create_expression_alterations(submission_instance, .astype(object)) expression_alterations_df.loc[ expression_alterations_df[ - 'expression_alteration_id'] == expression_alteration.expression_alteration_id, + 'expression_alteration.label'] == expression_alteration.expression_alteration_id, expression_alterations_entity_id_column_name ] = expression_alteration_id @@ -73,7 +73,7 @@ def _create_expression_alterations(submission_instance, class CmdSubmitFile: - BASE_URL = 'http://localhost:8080' + BASE_URL = 'https://api.ingest.dev.archive.morphic.bio/' SUBMISSION_ENVELOPE_CREATE_URL = f"{BASE_URL}/submissionEnvelopes/updateSubmissions" SUBMISSION_ENVELOPE_BASE_URL = f"{BASE_URL}/submissionEnvelopes" @@ -88,11 +88,15 @@ def __init__(self, args): self.user_profile = get_profile('morphic-util') self.access_token = self.user_profile.access_token self.aws = Aws(self.user_profile) - self.provider_api = APIProvider(self.BASE_URL) + self.provider_api = ProviderApi(self.BASE_URL) self.validation_errors = [] self.submission_errors = [] self.submission_envelope_id = None + # Read and store the context argument (if provided) + # For UCSF datasets, you might pass --context unperturbed_multiple. + self.context = getattr(args, "context", None) + # Assign and validate required arguments self.action = self._get_required_arg('action', "Submission action (ADD, MODIFY or DELETE) is mandatory") self.dataset = self._get_required_arg('dataset', ( @@ -101,6 +105,14 @@ def __init__(self, args): "the submit option, and link your dataset to your study before proceeding with this submission." )) + if self.dataset: + try: + self.provider_api.get(f"{self.BASE_URL}/datasets/{self.dataset}", + self.access_token) + except Exception as e: + print(f"Dataset does not exist {self.dataset}") + sys.exit(1) + # Validate file argument only if action is not DELETE if self.action != 'DELETE': self.file = self._get_required_arg('file', "File is mandatory") @@ -178,7 +190,7 @@ def _process_submission(self, submission_instance, list_of_files_in_upload_area) # Extract parsed data expression_alterations = parsed_data['expression_alterations'] expression_alterations_df = parsed_data['expression_alterations_df'] - parent_cell_line_name = parsed_data['parent_cell_line_name'] + parent_cell_line_names = parsed_data['parent_cell_line_names'] cell_lines = parsed_data['cell_lines'] cell_lines_df = parsed_data['cell_lines_df'] differentiated_cell_lines = parsed_data['differentiated_cell_lines'] @@ -208,28 +220,28 @@ def _process_submission(self, submission_instance, list_of_files_in_upload_area) if self._is_add_action(): self._create_submission_envelope() - parent_cell_line_id = self._handle_parent_cell_line(submission_instance, - parent_cell_line_name) - created_expression_alterations = self._handle_expression_alterations( - submission_instance, - expression_alterations, - expression_alterations_df, - parent_cell_line_name, - parent_cell_line_id - ) if cell_lines and cell_lines_df is not None: + if self._is_add_action(): + created_expression_alterations = self._handle_expression_alterations( + submission_instance, + expression_alterations, + expression_alterations_df, + parent_cell_line_names, + cell_lines + ) + created_cell_lines = self._create_cell_lines( submission_instance, cell_lines, cell_lines_df, created_expression_alterations) if differentiated_cell_lines and differentiated_cell_lines_df is not None: created_differentiated_or_undifferentiated_cell_lines = self._create_differentiated_cell_lines( - submission_instance, differentiated_cell_lines, differentiated_cell_lines_df) + submission_instance, differentiated_cell_lines, differentiated_cell_lines_df, differentiated) if (undifferentiated_cell_lines and undifferentiated_cell_lines_df is not None and not differentiated): created_differentiated_or_undifferentiated_cell_lines = self._create_differentiated_cell_lines( - submission_instance, undifferentiated_cell_lines, undifferentiated_cell_lines_df) + submission_instance, undifferentiated_cell_lines, undifferentiated_cell_lines_df, differentiated) if library_preparations and library_preparations_df is not None: created_library_preparations = self._create_library_preparations( @@ -286,8 +298,8 @@ def _handle_expression_alterations(self, submission_instance, expression_alterations, expression_alterations_df, - parent_cell_line_name, - parent_cell_line_id): + parent_cell_line_names, + cell_lines): """Handles the creation of expression alterations and links them to the parent cell line if needed.""" created_expression_alterations = [] @@ -296,14 +308,15 @@ def _handle_expression_alterations(self, submission_instance, expression_alterations, expression_alterations_df ) - if created_expression_alterations and parent_cell_line_id: - self._link_parent_cell_line_expression_alteration( - submission_instance, - self.access_token, - parent_cell_line_name, - parent_cell_line_id, - created_expression_alterations - ) + if created_expression_alterations: + for parent_cell_line_name in parent_cell_line_names: + self._link_parent_cell_line_expression_alteration( + submission_instance, + self.access_token, + parent_cell_line_name, + cell_lines, + created_expression_alterations + ) return created_expression_alterations @@ -346,11 +359,12 @@ def _parse_spreadsheet(self, parser): # Parse different sections of the spreadsheet expression_alterations, expression_alterations_df = parser.get_expression_alterations( - 'Expression alteration strategy', self.action, self.validation_errors + 'Expression alteration', self.action, self.validation_errors, + context=self.context ) - cell_lines, cell_lines_df, parent_cell_line_name = parser.get_cell_lines( - cell_line_sheet_name, self.action, self.validation_errors + cell_lines, cell_lines_df, parent_cell_line_names = parser.get_cell_lines( + cell_line_sheet_name, self.action, self.validation_errors, context=self.context ) if differentiated_cell_line_sheet_name: @@ -372,25 +386,40 @@ def _parse_spreadsheet(self, parser): if differentiated_cell_lines: differentiated = True merge_cell_line_and_differentiated_cell_line(cell_lines, differentiated_cell_lines, - self.validation_errors) + self.validation_errors, context=self.context) if undifferentiated_cell_lines and not differentiated: merge_cell_line_and_differentiated_cell_line(cell_lines, undifferentiated_cell_lines, - self.validation_errors) + self.validation_errors, context=self.context) - library_preparations, library_preparations_df = parser.get_library_preparations( - 'Library preparation', self.action, self.validation_errors - ) + library_preparations_result = parser.get_library_preparations( + 'Library preparation', differentiated, self.action, self.validation_errors) - if differentiated_cell_lines: - merge_differentiated_cell_line_and_library_preparation( - differentiated_cell_lines, library_preparations, self.validation_errors - ) + if not isinstance(library_preparations_result, tuple) or len(library_preparations_result) != 2: + raise ValueError("Unexpected return from get_library_preparations()") - if undifferentiated_cell_lines and not differentiated: - merge_differentiated_cell_line_and_library_preparation( - undifferentiated_cell_lines, library_preparations, self.validation_errors - ) + library_preparations, library_preparations_df = library_preparations_result + + # Handle N:1 relationships for differentiated products in library preparation + for lp in library_preparations: + if "differentiated_biomaterial_id" in lp.__dict__: + differentiated_ids = lp.differentiated_biomaterial_id.split("|") + lp.differentiated_biomaterial_id = differentiated_ids + + if differentiated_cell_lines: + if self.context == "unperturbed_multiple": + # Use the new processing that creates a LP process and links the clone and differentiated product + process_library_preparations(cell_lines, differentiated_cell_lines, library_preparations, self.validation_errors) + else: + # Use the original merge function for differentiated cell lines (for MSK, JAX, etc.) + merge_differentiated_cell_line_and_library_preparation(differentiated_cell_lines, + library_preparations, self.validation_errors, cell_lines=cell_lines) + elif undifferentiated_cell_lines and not differentiated: + if self.context == "unperturbed_multiple": + process_library_preparations(cell_lines, undifferentiated_cell_lines, library_preparations, self.validation_errors) + else: + merge_differentiated_cell_line_and_library_preparation(undifferentiated_cell_lines, + library_preparations, self.validation_errors, cell_lines=cell_lines) sequencing_files, sequencing_files_df = parser.get_sequencing_files( 'Sequence file', self.action, self.validation_errors @@ -404,7 +433,7 @@ def _parse_spreadsheet(self, parser): "expression_alterations_df": expression_alterations_df, "cell_lines": cell_lines, "cell_lines_df": cell_lines_df, - "parent_cell_line_name": parent_cell_line_name, + "parent_cell_line_names": parent_cell_line_names, "differentiated_cell_lines": differentiated_cell_lines, "differentiated_cell_lines_df": differentiated_cell_lines_df, "undifferentiated_cell_lines": undifferentiated_cell_lines, @@ -418,24 +447,26 @@ def _parse_spreadsheet(self, parser): "differentiated_cell_line_sheet_name": differentiated_cell_line_sheet_name, "undifferentiated_cell_line_sheet_name": undifferentiated_cell_line_sheet_name } - except Exception: + except Exception as e: + print(f"Exception occurred:", e) + self.validation_errors.append(f"Spreadsheet is invalid {self.file}") return None def _validate_and_upload(self, parsed_data, list_of_files_in_upload_area): - """ # Validate the parsed data and upload the file. + """ validate_sequencing_files(parsed_data['sequencing_files'], list_of_files_in_upload_area, self.dataset, self.validation_errors) """ """ Handle validation errors, including interacting with the user in case of a missing sheet. - """ + """ try: # Exit now if there are validation errors in the spreadsheet if self.validation_errors: raise ValidationError(self.validation_errors) - except ValidationError as e: + except ValidationError: # Check if the error is related to a missing sheet missing_sheet_errors = [msg for msg in self.validation_errors if "Missing sheet" in msg] @@ -443,13 +474,15 @@ def _validate_and_upload(self, parsed_data, list_of_files_in_upload_area): # Extract the sheet name(s) from the errors missing_sheets = ', '.join([msg.split("'")[1] for msg in missing_sheet_errors]) # Ask the user whether to proceed + """ user_response = input( f"A required sheet '{missing_sheets}' is missing. Do you want to proceed anyway? (yes/no): ").strip().lower() if user_response == 'yes': print("Proceeding with execution...") else: - print("Execution terminated due to missing required sheet.") - sys.exit(1) + """ + print("Execution terminated due to missing required sheet.") + sys.exit(1) else: # Print the error message # print(f"Validation Error: {e.errors}") @@ -517,11 +550,13 @@ def _create_cell_lines(self, def _create_differentiated_cell_lines(self, submission_instance, differentiated_cell_lines, - differentiated_cell_lines_df): + differentiated_cell_lines_df, + differentiated): for differentiated_cell_line in differentiated_cell_lines: differentiated_cell_line_entity_id = submission_instance.handle_differentiated_cell_line(None, differentiated_cell_line, differentiated_cell_lines_df, + differentiated, self.submission_envelope_id, self.dataset, self.access_token, @@ -590,7 +625,8 @@ def _establish_links(self, self.dataset, self.access_token, self.action, - self.submission_errors + self.submission_errors, + context=self.context ) return updated_dfs, message @@ -603,8 +639,43 @@ def _save_and_upload_results(self, """Save the updated dataframes and upload the results.""" current_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') output_file = f"submission_result_{current_time}.xlsx" + try: - # List of updated DataFrames and corresponding sheet names + # Expand gene info if in pooled mode + if self.context == 'pooled_differentiated': + print("Expanding expression alteration strategies for pooled_differentiated mode...") + if expression_alteration_df is not None and not expression_alteration_df.empty: + # Check if it's already flat + if "expression_alteration.genes.altered_gene_symbol" in expression_alteration_df.columns: + print("expression_alteration_df already flattened — skipping expansion.") + else: + expanded_rows = [] + for _, row in expression_alteration_df.iterrows(): + genes = row.get("genes", []) + if isinstance(genes, list): + for gene in genes: + expanded_rows.append({ + 'expression_alteration.label': row.get('expression_alteration.label'), + 'expression_alteration.parent_protocol_id': row.get('expression_alteration.parent_protocol_id'), + 'expression_alteration.method': row.get('expression_alteration.method'), + 'expression_alteration.genes.allele_specific': gene.get('allele_specific'), + 'expression_alteration.genes.altered_gene_symbol': gene.get('altered_gene_symbol'), + 'expression_alteration.genes.target_gene_hgnc_id': gene.get('target_gene_hgnc_id'), + 'expression_alteration.genes.targeted_genomic_region': gene.get('targeted_genomic_region'), + 'expression_alteration.genes.expected_alteration_type': gene.get('expected_alteration_type'), + 'expression_alteration.genes.editing_strategy': gene.get('editing_strategy'), + 'expression_alteration.genes.altered_locus': gene.get('altered_locus'), + 'expression_alteration.genes.guide_sequence': gene.get('guide_sequence'), + 'Id': row.get('Id') + }) + else: + print(f"Skipping row without gene list: {row}") + expression_alteration_df = pd.DataFrame(expanded_rows) + else: + print("expression_alteration_df is empty or None — no gene info expanded.") + + print(f"Preparing submission result file: {output_file}") + dataframes = [ (updated_dfs[0], cell_line_sheet_name), (updated_dfs[1], differentiated_or_undifferentiated_cell_line_sheet_name), @@ -613,18 +684,26 @@ def _save_and_upload_results(self, (expression_alteration_df, 'Expression alteration strategy') ] - # Create the Excel file and write only non-null DataFrames with pd.ExcelWriter(output_file, engine='openpyxl') as writer: for df, sheet_name in dataframes: - if df is not None: # Check if the DataFrame is not None - df.to_excel(writer, sheet_name=sheet_name, index=False) + if df is None: + print(f"Skipping sheet '{sheet_name}' — DataFrame is None") + continue + if df.empty: + print(f"Skipping sheet '{sheet_name}' — DataFrame is empty") + continue + print(f"Writing sheet '{sheet_name}' with shape {df.shape}") + df.to_excel(writer, sheet_name=sheet_name, index=False) + if os.path.exists(output_file): CmdUpload(self.aws, self.args).upload_file(self.dataset, output_file, os.path.basename(output_file)) print(f"File {output_file} uploaded successfully.") else: raise FileNotFoundError(f"The output file {output_file} was not created or cannot be found.") + except Exception as e: - print(f"Failed to upload file {output_file}. Error: {e}, Refer dataset {self.dataset} for tracing metadata") + print(f"Failed to upload file {output_file}. Error: {e}") + print(f"Refer to dataset '{self.dataset}' for metadata tracing.") def _delete_actions(self, submission_envelope_id, submission_instance, error=None): """Handle actions needed when a submission fails.""" @@ -666,12 +745,17 @@ def _link_parent_cell_line_expression_alteration(self, submission_instance, access_token, parent_cell_line_name, - parent_cell_line_id, + cell_lines, created_expression_alterations): - for expression_alteration in created_expression_alterations: - print(f"Linking parent cell line {parent_cell_line_name} " - f"as input to process of {expression_alteration.expression_alteration_id}") - submission_instance.perform_hal_linkage( - f"{self.BASE_URL}/biomaterials/{parent_cell_line_id}/inputToProcesses", - expression_alteration.id, 'processes', access_token - ) + parent_cell_line_id = self._handle_parent_cell_line(submission_instance, parent_cell_line_name) + + for cell_line in cell_lines: + if cell_line.parental_cell_line_name == parent_cell_line_name: + for expression_alteration in created_expression_alterations: + if cell_line.expression_alteration_id == expression_alteration.expression_alteration_id: + print(f"Expression alteration match found, Linking parent cell line {parent_cell_line_name} " + f"as input to process of {expression_alteration.expression_alteration_id}") + submission_instance.perform_hal_linkage( + f"{self.BASE_URL}/biomaterials/{parent_cell_line_id}/inputToProcesses", + expression_alteration.id, 'processes', access_token + ) diff --git a/ait/commons/util/command/view.py b/ait/commons/util/command/view.py index aa8fc74..3ba3533 100644 --- a/ait/commons/util/command/view.py +++ b/ait/commons/util/command/view.py @@ -1,5 +1,5 @@ from ait.commons.util.aws_client import Aws -from ait.commons.util.provider_api_util import APIProvider +from ait.commons.util.provider_api_util import ProviderApi from ait.commons.util.user_profile import get_profile @@ -10,7 +10,7 @@ def __init__(self, args): self.args = args self.access_token = get_profile('morphic-util').access_token self.user_profile = get_profile('morphic-util') - self.provider_api = APIProvider(self.base_url) + self.provider_api = ProviderApi(self.base_url) if hasattr(self.args, 'dataset') and self.args.dataset is not None: self.dataset = self.args.dataset diff --git a/ait/commons/util/provider_api_util.py b/ait/commons/util/provider_api_util.py index 851b052..315c3af 100644 --- a/ait/commons/util/provider_api_util.py +++ b/ait/commons/util/provider_api_util.py @@ -1,7 +1,26 @@ +import time import requests +from requests.exceptions import ConnectionError, Timeout + + +def request_with_retries(method, url, headers, params=None, json_data=None, retries=3, timeout=30): + """ + Helper function that attempts an HTTP request with retries and an exponential backoff. + """ + for attempt in range(retries): + try: + response = requests.request(method, url, headers=headers, params=params, json=json_data, timeout=timeout) + return response + except (ConnectionError, Timeout) as e: + if attempt < retries - 1: + wait = 2 ** attempt # exponential backoff + print(f"Request failed (attempt {attempt + 1}/{retries}). Retrying in {wait} seconds...") + time.sleep(wait) + else: + raise e -class APIProvider: +class ProviderApi: def __init__(self, base_url): self.base_url = base_url @@ -46,32 +65,26 @@ def request(self, method, url, access_token, params=None, data=None, data_type_i 'Authorization': f'Bearer {access_token}' } - # Send the HTTP request - response = requests.request(method, url, headers=headers, params=params, json=data) + # Use our helper with retries and a 30-second timeout. + response = request_with_retries(method, url, headers, params=params, json_data=data, retries=3, timeout=30) status_code = response.status_code - # Check for unsuccessful status codes if status_code not in (200, 201, 202, 204): print(f"Received {status_code} while executing {method} on {url}") - if method == 'DELETE': - # Return None for unsuccessful DELETE requests return None else: - # Raise an exception for other unsuccessful requests + # This raises the HTTPError raise response.raise_for_status() else: print(f"Received {status_code} while executing {method} on {url}") - # Handle POST requests with data_type_in_hal_link + if method == 'POST' and data_type_in_hal_link: response_data = response.json() - # Return the URL from the HAL link in the response return response_data['_links'][data_type_in_hal_link]['href'] elif method == 'DELETE': - # Return the status code for DELETE requests return status_code else: - # Return the JSON-parsed response data for other successful requests return response.json() def put(self, url, access_token): @@ -89,3 +102,6 @@ def delete(self, url, access_token): def post(self, url, data_type_in_hal_link, data, access_token): return self.request('POST', url, access_token, data=data, data_type_in_hal_link=data_type_in_hal_link) + + def patch(self, url, access_token, data): + return self.request('PATCH', url, access_token, data=data) diff --git a/ait/commons/util/settings/morphic_util.py b/ait/commons/util/settings/morphic_util.py index 382b391..5dec480 100644 --- a/ait/commons/util/settings/morphic_util.py +++ b/ait/commons/util/settings/morphic_util.py @@ -1,7 +1,7 @@ from pathlib import Path NAME = 'morphic-util' -VERSION = '1.0.1' +VERSION = '1.0.5-PROD' DESC = 'CLI tool for submitting your analysis data and metadata' AUTHOR = 'dgupta' AUTHOR_EMAIL = 'dgupta@ebi.ac.uk' diff --git a/ait/commons/util/spreadsheet_util.py b/ait/commons/util/spreadsheet_util.py index ff9fec2..798abdb 100644 --- a/ait/commons/util/spreadsheet_util.py +++ b/ait/commons/util/spreadsheet_util.py @@ -3,6 +3,8 @@ import pandas as pd import json import numpy as np +import json +import requests """ class MissingMandatoryFieldError(Exception): @@ -67,23 +69,30 @@ class CellLine: def __init__(self, biomaterial_id, description, - derived_from_accession, + parental_cell_line_name, clone_id, protocol_id, zygosity, cell_type, + treatment_condition, + wt_control_status, expression_alteration_id, - id): + id, + parental_only=False): self.biomaterial_id = biomaterial_id self.description = description - self.derived_from_accession = derived_from_accession + self.parental_cell_line_name = parental_cell_line_name self.clone_id = clone_id self.protocol_id = protocol_id self.zygosity = zygosity self.cell_type = cell_type + self.treatment_condition = treatment_condition + self.wt_control_status = wt_control_status self.differentiated_cell_lines = [] self.expression_alteration_id = expression_alteration_id self.id = id + # New flag: if True, output minimal content (for parental cell lines with no alteration) + self.parental_only = parental_only def add_differentiated_cell_line(self, differentiated_cell_line): self.differentiated_cell_lines.append(differentiated_cell_line) @@ -92,99 +101,141 @@ def __repr__(self): return json.dumps(self.to_dict(), indent=2) def to_dict(self): - content = { - "label": self.biomaterial_id, - "description": self.description, - "derived_from_cell_line": self.derived_from_accession, - "zygosity": self.zygosity, - "type": self.cell_type - } - - # Only add optional/custom fields if they are provided - if self.clone_id: - content["clone_id"] = self.clone_id # Not in schema, custom field - - if self.protocol_id: - content["protocol_id"] = self.protocol_id # Not in schema, custom field - - if self.expression_alteration_id: - content["expression_alteration_id"] = self.expression_alteration_id # Not in schema, custom field - - return { - "content": content - } + if self.parental_only: + # Minimal content for a parental cell line not linked to an alteration protocol. + return {"content": self.biomaterial_id} + else: + content = { + "label": self.biomaterial_id, # matches 'label' in schema + "description": self.description, + "zygosity": self.zygosity, + "type": self.cell_type, + "parental_cell_line_name": self.parental_cell_line_name + } + if self.clone_id: + content["clone_id"] = self.clone_id + if self.protocol_id: + content["cell_line_generation_protocol"] = self.protocol_id + if self.treatment_condition: + content["treatment_condition"] = self.treatment_condition + if self.wt_control_status: + content["wt_control_status"] = self.wt_control_status + return {"content": content} + + @classmethod + def from_existing(cls, existing): + content = existing.get("content", {}) + + # The database id you need is either in 'id' or in the self HAL link + db_id = ( + existing.get("id") or + get_entity_id_from_hal_link(existing["_links"]["self"]["href"]) + ) + return cls( + biomaterial_id = content.get("label"), + description = content.get("description"), + parental_cell_line_name = content.get("parental_cell_line_name"), + clone_id = content.get("clone_id"), + protocol_id = content.get("cell_line_generation_protocol"), + zygosity = content.get("zygosity"), + cell_type = content.get("type"), + treatment_condition = content.get("treatment_condition"), + wt_control_status = content.get("wt_control_status"), + expression_alteration_id = None, # keep setter logic in handle_cell_line + id = db_id, # <— store the **ObjectId**, not the UUID + parental_only = False + ) class ExpressionAlterationStrategy: def __init__(self, expression_alteration_id, - protocol_id, - allele_specific, - altered_gene_symbols, - altered_gene_ids, - targeted_genomic_region, - expected_alteration_type, - sgrna_target, - protocol_method_text, - altered_locus, - guide_sequence, - id): + parent_protocol_id, + method, + id=None, + allele_specific=None, + altered_gene_symbol=None, + target_gene_hgnc_id=None, + targeted_genomic_region=None, + expected_alteration_type=None, + editing_strategy=None, + altered_locus=None, + guide_sequence=None, + genes=None): self.expression_alteration_id = expression_alteration_id - self.protocol_id = protocol_id + self.parent_protocol_id = parent_protocol_id + self.method = method + self.id = id + + # Legacy mode self.allele_specific = allele_specific - self.altered_gene_symbols = altered_gene_symbols - self.altered_gene_ids = altered_gene_ids + self.altered_gene_symbol = altered_gene_symbol + self.target_gene_hgnc_id = target_gene_hgnc_id self.targeted_genomic_region = targeted_genomic_region self.expected_alteration_type = expected_alteration_type - self.sgrna_target = sgrna_target - self.protocol_method_text = protocol_method_text + self.editing_strategy = editing_strategy self.altered_locus = altered_locus self.guide_sequence = guide_sequence - self.id = id - def __repr__(self): - return json.dumps(self.to_dict(), indent=2) + # New pooled-style gene list + self.genes = genes or [] def to_dict(self): - return { - "content": { - "expression_alteration_label": self.expression_alteration_id, - "protocol_id": self.protocol_id, + # Prefer pooled-style genes array if present + if self.genes: + genes_payload = self.genes + else: + genes_payload = [{ "allele_specific": self.allele_specific, - "altered_gene_symbols": self.altered_gene_symbols, - "altered_gene_ids": self.altered_gene_ids, + "altered_gene_symbol": self.altered_gene_symbol, + "target_gene_hgnc_id": self.target_gene_hgnc_id, "targeted_genomic_region": self.targeted_genomic_region, "expected_alteration_type": self.expected_alteration_type, - "sgrna_target": self.sgrna_target, - "protocol_method_text": self.protocol_method_text, + "editing_strategy": self.editing_strategy, "altered_locus": self.altered_locus, - "guide_sequence": self.guide_sequence, - "id": self.id + "guide_sequence": self.guide_sequence + }] + + return { + "content": { + "expression_alteration_id": self.expression_alteration_id, + "parent_protocol_id": self.parent_protocol_id, + "genes": genes_payload, + "method": self.method } } + def __repr__(self): + return json.dumps(self.to_dict(), indent=2) + class DifferentiatedCellLine: def __init__(self, - biomaterial_id, + biomaterial_id, # Maps to 'label' description, - input_biomaterial_id, - protocol_id, - timepoint_value, - timepoint_unit, + cell_line_biomaterial_id, # Maps to 'clonal_cell_line_label' + differentiated_product_protocol_id, + undifferentiated_product_protocol_id, terminally_differentiated, model_system, - id): - self.biomaterial_id = biomaterial_id + timepoint_value, + timepoint_unit, + treatment_condition=None, # New field as per schema + wt_control_status=None, # New field as per schema + id=None): # Optional, custom field + self.biomaterial_id = biomaterial_id # This maps to 'label' in the schema self.description = description - self.input_biomaterial_id = input_biomaterial_id - self.protocol_id = protocol_id - self.timepoint_value = timepoint_value - self.timepoint_unit = timepoint_unit + self.cell_line_biomaterial_id = cell_line_biomaterial_id # Maps to 'clonal_cell_line_label' + self.differentiated_product_protocol_id = differentiated_product_protocol_id + self.undifferentiated_product_protocol_id = undifferentiated_product_protocol_id self.terminally_differentiated = terminally_differentiated self.model_system = model_system + self.timepoint_value = timepoint_value + self.timepoint_unit = timepoint_unit + self.treatment_condition = treatment_condition # Added to match schema + self.wt_control_status = wt_control_status # Added to match schema self.library_preparations = [] - self.id = id + self.id = id # Custom field not in the schema def add_library_preparation(self, library_preparation): self.library_preparations.append(library_preparation) @@ -196,18 +247,21 @@ def to_dict(self): content = { "label": self.biomaterial_id, "description": self.description, + "clonal_cell_line_id": self.cell_line_biomaterial_id, + "differentiated_product_protocol_id": self.differentiated_product_protocol_id, + "undifferentiated_product_protocol_id": self.undifferentiated_product_protocol_id, + "terminally_differentiated": self.terminally_differentiated, + "model_system": self.model_system, "timepoint_value": self.timepoint_value, "timepoint_unit": self.timepoint_unit, - "terminally_differentiated": self.terminally_differentiated, - "model_system": self.model_system } - # Only add optional/custom fields if they are provided - if self.input_biomaterial_id: - content["input_biomaterial_id"] = self.input_biomaterial_id # Not in schema, custom field + # Add optional fields only if they are provided + if self.treatment_condition: + content["treatment_condition"] = self.treatment_condition - if self.protocol_id: - content["protocol_id"] = self.protocol_id # Not in schema, custom field + if self.wt_control_status: + content["wt_control_status"] = self.wt_control_status return { "content": content @@ -218,7 +272,6 @@ class LibraryPreparation: def __init__(self, biomaterial_id, protocol_id, - dissociation_protocol_id, differentiated_biomaterial_id, average_fragment_size, input_amount_value, @@ -232,7 +285,6 @@ def __init__(self, id): self.biomaterial_id = biomaterial_id self.protocol_id = protocol_id - self.dissociation_protocol_id = dissociation_protocol_id self.differentiated_biomaterial_id = differentiated_biomaterial_id self.average_fragment_size = average_fragment_size self.input_amount_value = input_amount_value @@ -253,7 +305,7 @@ def __repr__(self): return json.dumps(self.to_dict(), indent=2) def to_dict(self): - # Helper function to handle invalid JSON values + # Helper function to handle invalid JSON values (e.g., NaN, infinite) def convert_to_valid_json_value(value): if isinstance(value, float) and (np.isnan(value) or not np.isfinite(value)): return None @@ -261,6 +313,7 @@ def convert_to_valid_json_value(value): content = { "label": self.biomaterial_id, + "library_preparation_protocol_id": self.protocol_id, "average_fragment_size": convert_to_valid_json_value(self.average_fragment_size), "input_amount_value": convert_to_valid_json_value(self.input_amount_value), "input_amount_unit": self.input_amount_unit, @@ -273,12 +326,8 @@ def convert_to_valid_json_value(value): } # Add optional/custom fields if they are provided - if self.protocol_id: - content["protocol_id"] = self.protocol_id # Not in schema, custom field - if self.dissociation_protocol_id: - content["dissociation_protocol_id"] = self.dissociation_protocol_id # Not in schema, custom field if self.differentiated_biomaterial_id: - content["differentiated_biomaterial_id"] = self.differentiated_biomaterial_id # Not in schema, custom field + content["differentiated_biomaterial_id"] = self.differentiated_biomaterial_id return { "content": content @@ -354,6 +403,7 @@ def find_orphans(source_entities, errors): """ Validates that each source entity has a corresponding target entity. + For parental cell lines, a target is considered a match if it starts with the source value. Parameters: source_entities (list): The list of source entities. @@ -365,17 +415,32 @@ def find_orphans(source_entities, Raises: OrphanedEntityError: If a source entity doesn't have a corresponding target entity. + """ for source_entity in source_entities: match_found = False + source_value = getattr(source_entity, source_attr) for target_entity in target_entities: - if getattr(target_entity, target_attr) == getattr(source_entity, source_attr): - match_found = True - break + target_value = getattr(target_entity, target_attr) + + if isinstance(target_value, list): + if source_value in target_value: + match_found = True + break + else: + # For parental cell lines, allow prefix matching. + if source_type == "Cell line (Parental)": + if str(target_value).startswith(str(source_value)): + match_found = True + break + else: + if target_value == source_value: + match_found = True + break if not match_found: - errors.append(f"Orphaned entity {source_type} and ID is {getattr(source_entity, source_attr)}") + errors.append(f"Orphaned entity {source_type} and ID is {source_value}") # raise OrphanedEntityError(source_type, getattr(source_entity, source_attr)) # print(f"VALIDATED: All {source_type.lower()}s have corresponding {target_type.lower()}s.") @@ -429,105 +494,203 @@ def merge_library_preparation_sequencing_file(library_preparations, library_preparation.add_sequencing_file(sequencing_file) -def merge_differentiated_cell_line_and_library_preparation(differentiated_cell_lines, - library_preparations, - errors): +def merge_differentiated_cell_line_and_library_preparation(differentiated_cell_lines, library_preparations, errors, cell_lines=None): """ Merges differentiated cell lines and library preparations based on their biomaterial IDs. - - Parameters: - ----------- - differentiated_cell_lines : list - A list of DifferentiatedCellLine objects to be merged. - library_preparations : list - A list of LibraryPreparation objects to be merged. - - Returns: - -------- - None - - Raises: - ------ - MissingEntityError: - If a library preparation does not have a corresponding differentiated cell line. + An extra optional parameter 'cell_lines' is accepted to avoid unexpected keyword argument errors. """ + try: + find_orphans( + source_entities=differentiated_cell_lines, + target_entities=library_preparations, + source_attr="biomaterial_id", + target_attr="differentiated_biomaterial_id", + source_type="Differentiated Cell Line", + target_type="Library Preparation", + errors=errors + ) - find_orphans( - source_entities=differentiated_cell_lines, - target_entities=library_preparations, - source_attr="biomaterial_id", - target_attr="differentiated_biomaterial_id", - source_type="Differentiated Cell line", - target_type="Library Preparation", - errors=errors - ) + missing_parent_entity_error = MissingParentEntityError() + differentiated_ids = {diff_cell.biomaterial_id for diff_cell in differentiated_cell_lines} - missing_parent_entity_error = MissingParentEntityError() + for library_preparation in library_preparations: + diff_biomaterial_id = library_preparation.differentiated_biomaterial_id - differentiated_ids = {diff_cell.biomaterial_id for diff_cell in differentiated_cell_lines} + if isinstance(diff_biomaterial_id, list): + missing_ids = [id_ for id_ in diff_biomaterial_id if id_ not in differentiated_ids] + if missing_ids: + missing_parent_entity_error.add_error("Differentiated Cell Line", "Library Preparation", ", ".join(missing_ids), errors) + else: + if diff_biomaterial_id not in differentiated_ids: + missing_parent_entity_error.add_error("Differentiated Cell Line", "Library Preparation", diff_biomaterial_id, errors) - for library_preparation in library_preparations: - if library_preparation.differentiated_biomaterial_id not in differentiated_ids: - missing_parent_entity_error.add_error("Differentiated Cell Line", - "Library Preparation", - library_preparation.biomaterial_id, - errors) + for diff_cell in differentiated_cell_lines: + for library_preparation in library_preparations: + diff_biomaterial_id = library_preparation.differentiated_biomaterial_id - for differentiated_cell_line in differentiated_cell_lines: - for library_preparation in library_preparations: - if library_preparation.differentiated_biomaterial_id == differentiated_cell_line.biomaterial_id: - differentiated_cell_line.add_library_preparation(library_preparation) + if isinstance(diff_biomaterial_id, list): + if diff_cell.biomaterial_id in diff_biomaterial_id: + diff_cell.add_library_preparation(library_preparation) + elif diff_biomaterial_id == diff_cell.biomaterial_id: + diff_cell.add_library_preparation(library_preparation) + + except Exception as e: + print(f"Exception occurred during merging of differentiated cell lines and library preparations: {e}") -def merge_cell_line_and_differentiated_cell_line(cell_lines, - differentiated_cell_lines, - errors): +def merge_cell_line_and_differentiated_cell_line(cell_lines, differentiated_cell_lines, errors, context=None): """ Merges cell lines and differentiated cell lines based on their biomaterial IDs. + Only parental cell lines (those with clone_id is None) are used for linking. + For parental cell lines, a prefix match is used. + """ + # Filter to include only parental cell lines. + if context == "unperturbed_multiple": + parental_cell_lines = [cl for cl in cell_lines if cl.clone_id is None] + else: + parental_cell_lines = cell_lines + try: + find_orphans( + source_entities=parental_cell_lines, + target_entities=differentiated_cell_lines, + source_attr="biomaterial_id", + target_attr="cell_line_biomaterial_id", + source_type="Cell line (Parental)", + target_type="Differentiated Cell line", + errors=errors + ) - Parameters: - ----------- - cell_lines : list - A list of CellLine objects to be merged. - differentiated_cell_lines : list - A list of DifferentiatedCellLine objects to be merged. + missing_parent_entity_error = MissingParentEntityError() - Returns: - -------- - None + parental_ids = {cl.biomaterial_id for cl in parental_cell_lines} + for diff_cell in differentiated_cell_lines: + if diff_cell.cell_line_biomaterial_id not in parental_ids: + missing_parent_entity_error.add_error("Cell Line", "Differentiated Cell line", diff_cell.cell_line_biomaterial_id, errors) - Raises: - ------ - MissingEntityError: - If a differentiated cell line does not have a corresponding cell line. + for cl in parental_cell_lines: + for diff_cell in differentiated_cell_lines: + if diff_cell.cell_line_biomaterial_id == cl.biomaterial_id: + cl.add_differentiated_cell_line(diff_cell) + + except Exception as e: + print(f"Exception occurred during merging: {e}") + + +def target_in_ids(target, id_set): + """ + Returns True if the target (which may be a string or a list of strings) + has any element in id_set. """ + if isinstance(target, list): + return any(item in id_set for item in target) + else: + return target in id_set - find_orphans( - source_entities=cell_lines, - target_entities=differentiated_cell_lines, - source_attr="biomaterial_id", - target_attr="input_biomaterial_id", - source_type="Cell line", - target_type="Differentiated Cell line", - errors=errors - ) - missing_parent_entity_error = MissingParentEntityError() - cell_line_ids = {cell_line.biomaterial_id for cell_line in cell_lines} +def merge_differentiated_cell_line_and_library_preparation_for_lp(differentiated_cell_lines, library_preps, errors, cell_lines=None): + """ + Merges library preparations with differentiated cell lines. + Only processes library preparations whose differentiated_biomaterial_id is found in differentiated_cell_lines. + """ + # Create a set of differentiated cell line IDs (these should be strings) + diff_ids = {d.biomaterial_id for d in differentiated_cell_lines} + # Use target_in_ids() to allow lp.differentiated_biomaterial_id to be a list or a string. + library_preps_for_diff = [lp for lp in library_preps if target_in_ids(lp.differentiated_biomaterial_id, diff_ids)] + + if not library_preps_for_diff: + return # Nothing to merge for differentiated cell lines + + try: + find_orphans( + source_entities=differentiated_cell_lines, + target_entities=library_preps_for_diff, + source_attr="biomaterial_id", + target_attr="differentiated_biomaterial_id", + source_type="Differentiated Cell Line", + target_type="Library Preparation", + errors=errors + ) - for differentiated_cell_line in differentiated_cell_lines: - if differentiated_cell_line.input_biomaterial_id not in cell_line_ids: - missing_parent_entity_error.add_error("Cell Line", - "Differentiated Cell line", - differentiated_cell_line.biomaterial_id, - errors) + missing_parent_entity_error = MissingParentEntityError() + for lp in library_preps_for_diff: + # We check using the helper to avoid errors if lp.differentiated_biomaterial_id is a list. + if not target_in_ids(lp.differentiated_biomaterial_id, diff_ids): + missing_parent_entity_error.add_error("Differentiated Cell Line", "Library Preparation", str(lp.differentiated_biomaterial_id), errors) + + for diff_cell in differentiated_cell_lines: + for lp in library_preps_for_diff: + # If the target is a list, check if the diff_cell's id is in that list. + if isinstance(lp.differentiated_biomaterial_id, list): + if diff_cell.biomaterial_id in lp.differentiated_biomaterial_id: + diff_cell.add_library_preparation(lp) + elif lp.differentiated_biomaterial_id == diff_cell.biomaterial_id: + diff_cell.add_library_preparation(lp) + except Exception as e: + print(f"Exception during merging of differentiated cell lines and library preparations: {e}") + + +def merge_cell_line_and_library_preparation_for_lp(cell_lines, library_preps, errors): + """ + Merges library preparations with clonal cell lines. + Only processes library preparations whose differentiated_biomaterial_id is found among clonal cell lines. + """ + # Build a set of clonal cell line IDs (those with non-null clone_id) + clonal_ids = {cl.biomaterial_id for cl in cell_lines if cl.clone_id is not None} + library_preps_for_clones = [lp for lp in library_preps if target_in_ids(lp.differentiated_biomaterial_id, clonal_ids)] + + if not library_preps_for_clones: + return # Nothing to merge for clonal cell lines + + try: + find_orphans( + source_entities=cell_lines, + target_entities=library_preps_for_clones, + source_attr="biomaterial_id", + target_attr="differentiated_biomaterial_id", + source_type="Cell Line (Clonal)", + target_type="Library Preparation", + errors=errors + ) + + missing_parent_entity_error = MissingParentEntityError() + for lp in library_preps_for_clones: + if not target_in_ids(lp.differentiated_biomaterial_id, clonal_ids): + missing_parent_entity_error.add_error("Cell Line", "Library Preparation", str(lp.differentiated_biomaterial_id), errors) - for cell_line in cell_lines: - for differentiated_cell_line in differentiated_cell_lines: - if differentiated_cell_line.input_biomaterial_id == cell_line.biomaterial_id: - cell_line.add_differentiated_cell_line(differentiated_cell_line) + for cl in cell_lines: + if cl.clone_id is not None: + for lp in library_preps_for_clones: + if isinstance(lp.differentiated_biomaterial_id, list): + if cl.biomaterial_id in lp.differentiated_biomaterial_id: + cl.add_library_preparation(lp) + elif lp.differentiated_biomaterial_id == cl.biomaterial_id: + cl.add_library_preparation(lp) + except Exception as e: + print(f"Exception during merging of clonal cell lines and library preparations: {e}") +def process_library_preparations(cell_lines, differentiated_cell_lines, library_preps, errors): + """ + For UCSF ingestion, process library preparations only for differentiated cell lines. + Linking for clonal cell lines is deferred to the submission linking phase. + """ + # Process only the library preparations for differentiated (parental) cell lines. + diff_ids = {d.biomaterial_id for d in differentiated_cell_lines} + library_preps_for_diff = [lp for lp in library_preps if target_in_ids(lp.differentiated_biomaterial_id, diff_ids)] + if library_preps_for_diff: + merge_differentiated_cell_line_and_library_preparation_for_lp(differentiated_cell_lines, library_preps_for_diff, errors) + +def find_existing_biomaterial_by_label(label, ingest_api_base): + url = f"{ingest_api_base}/biomaterials/search/findByContentLabel?label={label}" + print(f"Find_existing_biomaterial_by_label URL '{url}'") + response = requests.get(url) + print(f"Find_existing_biomaterial_by_label response '{response}'") + if response.status_code == 200: + results = response.json() + biomaterials = results.get("_embedded", {}).get("biomaterials", []) + return biomaterials[0] if biomaterials else None + return None + class SpreadsheetSubmitter: """ A class for parsing and processing data from an Excel spreadsheet containing information about @@ -590,15 +753,16 @@ def __init__(self, file_path): def list_sheets(self): """ - Retrieves the names of all sheets present in the Excel file. + Retrieves the names of all sheets present in the Excel file, + trimming any leading or trailing spaces. Returns: -------- list - A list of sheet names present in the Excel file. + A list of trimmed sheet names present in the Excel file. """ xls = pd.ExcelFile(self.file_path, engine='openpyxl') - return xls.sheet_names + return [sheet_name.strip() for sheet_name in xls.sheet_names] def input_file_to_data_frames(self, sheet_name, action): if action.upper() == 'MODIFY': @@ -623,176 +787,147 @@ def input_file_to_data_frames(self, sheet_name, action): return df - def parse_cell_lines(self, - sheet_name, - action, - errors): + def parse_cell_lines(self, sheet_name, action, errors, context=None): """ - Parses data related to cell lines from a specified sheet in the Excel file. + Parses cell lines from the clonal cell line sheet. - Parameters: - ----------- - sheet_name : str - The name of the sheet containing cell line data. + In UCSF datasets, each row represents a clone (e.g. iPSC_Rep1) that has an associated + parental cell line name (e.g. KOLF2.2J_AAVS1_inducible_CRISPRi). Since clones go directly + to library preparation and the parental cell line is used for differentiation, this function + creates a separate parental cell line entity if its label is not found among the clones. Returns: - -------- - tuple - A tuple containing: - - list of CellLine objects parsed from the specified sheet. - - pd.DataFrame with the parsed data. + combined (list): A list of CellLine objects including both clones and auto-generated parental cell lines. + df_filtered (pd.DataFrame): The filtered DataFrame. + parental_names (list): A list of unique parental cell line names extracted from the sheet. """ df = self.input_file_to_data_frames(sheet_name=sheet_name, action=action) df.columns = df.columns.str.strip() - parent_cell_line_names = [] - - # Check if the required column exists - if 'cell_line.biomaterial_core.biomaterial_id' not in df.columns: - errors.append( - f"The column 'cell_line.biomaterial_core.biomaterial_id' does not exist in the {sheet_name} sheet. " - f"The rest of the file will not be processed") - return [], df + if 'clonal_cell_line.label' not in df.columns: + errors.append(f"The column 'clonal_cell_line.label' does not exist in the {sheet_name} sheet.") + return [], df, [] - # Filter rows where biomaterial_id is not null - df = df[df['cell_line.biomaterial_core.biomaterial_id'].notna()] - # Replace invalid float values with None + # Filter out placeholder rows. + df = df[df['clonal_cell_line.label'].notna()] df = df.map(lambda x: None if isinstance(x, float) and (np.isnan(x) or not np.isfinite(x)) else x) - # Define columns to check for invalid starting values - cols_to_check = ['cell_line.biomaterial_core.biomaterial_id'] - invalid_start_values = ( - 'FILL OUT INFORMATION BELOW THIS ROW', 'A unique ID for the biomaterial.', - 'cell_line.biomaterial_core.biomaterial_id' - ) - # Filter out rows with invalid starting values - mask = df[cols_to_check].apply(lambda x: ~x.astype(str).str.startswith(invalid_start_values)).all(axis=1) - df_filtered = df[mask] - # Check for a unique value in 'cell_line.derived_cell_line_accession' - derived_col = 'cell_line.derived_cell_line_accession' - - if derived_col in df_filtered.columns: - parent_cell_line_names = df_filtered[derived_col].dropna().unique() - - if len(parent_cell_line_names) != 1: - errors.append( - f"The column '{derived_col}' must have the same value across all rows. Found values: {parent_cell_line_names}") - - return [], df + mask = df['clonal_cell_line.label'].astype(str).str.startswith('FILL OUT INFORMATION BELOW THIS ROW') + df_filtered = df[~mask] - # Process rows to create CellLine objects cell_lines = [] - + parental_names = set() for _, row in df_filtered.iterrows(): - biomaterial_id = row['cell_line.biomaterial_core.biomaterial_id'] - derived_from_accession = row.get('cell_line.derived_cell_line_accession') - cell_type = row.get('cell_line.type') - expression_alteration_id = row.get('expression_alteration_id') - - # Error handling for missing mandatory fields - if pd.isnull(biomaterial_id): - errors.append("Biomaterial ID cannot be null in any row of the Cell line sheet.") - - if any(pd.isnull(field) for field in [derived_from_accession, cell_type]): - errors.append( - f"Mandatory fields (derived_accession, cell_type, expression_alteration_id) are required for Cell " - f"line entity: {biomaterial_id}") + label = row['clonal_cell_line.label'] + parent_name = row.get('clonal_cell_line.parental_cell_line_name') + + print(f"Examining clonal cell line '{label}'") + existing = find_existing_biomaterial_by_label(label, ingest_api_base="https://api.ingest.archive.morphic.bio") + + if existing: + print(f"Reusing existing clonal cell line '{label}'") + cell_line = CellLine.from_existing(existing) + # Update expression alteration ID if it's provided in the spreadsheet + ea_id = row.get('expression_alteration.label') + if ea_id: + cell_line.expression_alteration_id = ea_id + cell_lines.append(cell_line) + continue cell_lines.append( CellLine( - biomaterial_id=biomaterial_id, - description=row.get('cell_line.biomaterial_core.biomaterial_description'), - derived_from_accession=derived_from_accession, - clone_id=row.get('cell_line.clone_id'), - protocol_id=row.get('gene_expression_alteration_protocol.protocol_core.protocol_id'), - zygosity=row.get('cell_line.zygosity'), - cell_type=cell_type, - expression_alteration_id=expression_alteration_id, + biomaterial_id=label, + description=row.get('clonal_cell_line.description'), + parental_cell_line_name=parent_name, + clone_id=row.get('clonal_cell_line.clone_id'), + protocol_id=row.get('clonal_cell_line.cell_line_generation_protocol'), + zygosity=row.get('clonal_cell_line.zygosity'), + cell_type=row.get('clonal_cell_line.type'), + treatment_condition=row.get('clonal_cell_line.treatment_condition'), + wt_control_status=row.get('clonal_cell_line.wt_control_status'), + expression_alteration_id=row.get('expression_alteration.label'), id=row.get('Id') ) ) + if parent_name and parent_name != label: + parental_names.add(parent_name) + + # Only auto‑generate parental cell lines if we’re in UCSF mode. + if context == "unperturbed_multiple": + parental_cell_lines = [] + for parent in parental_names: + if parent not in {cl.biomaterial_id for cl in cell_lines}: + parental_cell_lines.append( + CellLine( + biomaterial_id=parent, + description="Auto-generated parental cell line from clonal cell lines", + parental_cell_line_name=None, + clone_id=None, + protocol_id=None, + zygosity=None, + cell_type=None, + treatment_condition=None, + wt_control_status=None, + expression_alteration_id=None, + id=None, + parental_only=True + ) + ) + combined = parental_cell_lines + cell_lines + else: + combined = cell_lines # Legacy mode: use only the clones. - return cell_lines, df_filtered, parent_cell_line_names[0] + return combined, df_filtered, list(parental_names) - def parse_differentiated_cell_lines(self, - sheet_name, - action, - errors): + def parse_differentiated_cell_lines(self, sheet_name, action, errors): """ Parses data related to differentiated cell lines from a specified sheet in the Excel file. - - Parameters: - ----------- - sheet_name : str - The name of the sheet containing differentiated cell line data. - column_mapping : dict - A dictionary mapping column names in the sheet to expected attribute names. - - Returns: - -------- - list - A list of DifferentiatedCellLine objects parsed from the specified sheet. + Uses the 'clonal_cell_line.parental_cell_line_name' (or falls back to 'clonal_cell_line.label') + to link differentiated products to the parental cell line. """ df = self.input_file_to_data_frames(sheet_name=sheet_name, action=action) df.columns = df.columns.str.strip() - # df = df.rename(columns=column_mapping) - # Remove unnamed columns (columns without headers) - # df = df.loc[:, ~df.columns.str.startswith('Unnamed')] - # Check if the required column exists - if 'differentiated_cell_line.biomaterial_core.biomaterial_id' not in df.columns: - errors.append(f"The column 'differentiated_cell_line.biomaterial_core.biomaterial_id' does not " - f"exist in {sheet_name} name. The rest of the file will not be processed") + if 'differentiated_product.label' not in df.columns: + errors.append(f"The column 'differentiated_product.label' does not exist in {sheet_name}. The rest of the file will not be processed") return [], df - # Filter rows where biomaterial_id is not null - df = df[df['differentiated_cell_line.biomaterial_core.biomaterial_id'].notna()] + df = df[df['differentiated_product.label'].notna()] df = df.map(lambda x: None if isinstance(x, float) and (np.isnan(x) or not np.isfinite(x)) else x) - # Define columns to check for values starting with 'ABC' or 'XYZ' - cols_to_check = ['differentiated_cell_line.biomaterial_core.biomaterial_id'] - # Create a mask to filter rows where any of the specified columns start with 'ABC' or 'XYZ' + cols_to_check = ['differentiated_product.label'] mask = df[cols_to_check].apply(lambda x: ~x.astype(str).str.startswith( ('FILL OUT INFORMATION BELOW THIS ROW', 'A unique ID for the biomaterial.', 'differentiated_cell_line.biomaterial_core.biomaterial_id'))).all(axis=1) - # Apply the mask to filter out rows df_filtered = df[mask] - # Check for mandatory fields and create Differentiated CellLine objects - differentiated_cell_lines = [] + differentiated_cell_lines = [] for _, row in df_filtered.iterrows(): - differentiated_biomaterial_id = row['differentiated_cell_line.biomaterial_core.biomaterial_id'] - biomaterial_id = row.get('cell_line.biomaterial_core.biomaterial_id') - - # Check if biomaterial_id is null - if pd.isnull(differentiated_biomaterial_id): - errors.append("Differentiated Cell line ID cannot be null in any row of the Differentiated Cell line " - "sheet.") - # raise MissingMandatoryFieldError("Differentiated Cell line ID cannot be null in any row.") - - # Check if derived_accession and cell_type are present - if pd.isnull(biomaterial_id): - errors.append(f"Input Cell line ID cannot be null for Differentiated Cell line: " - f"{differentiated_biomaterial_id}") - """ - raise MissingMandatoryFieldError( - "Input Cell line ID cannot be null. " + differentiated_biomaterial_id) - """ + label = row['differentiated_product.label'] + # Attempt to get the parental cell line name; if missing, fallback to the provided clonal label. + parent_biomaterial_id = row.get('clonal_cell_line.parental_cell_line_name') or row.get('clonal_cell_line.label') + if pd.isnull(label): + errors.append("Differentiated Cell line ID cannot be null in any row of the Differentiated Cell line sheet.") + if pd.isnull(parent_biomaterial_id): + errors.append(f"Parental Cell line ID cannot be null for Differentiated Cell line: {label}") - # Create DifferentiatedCellLine objects from filtered DataFrame rows differentiated_cell_lines.append( DifferentiatedCellLine( - biomaterial_id=differentiated_biomaterial_id, - description=row.get('differentiated_cell_line.biomaterial_core.biomaterial_description'), - input_biomaterial_id=biomaterial_id, - protocol_id=row.get('differentiation_protocol.protocol_core.protocol_id'), - timepoint_value=row.get('differentiated_cell_line.timepoint_value'), - timepoint_unit=row.get('differentiated_cell_line.timepoint_unit.text'), - terminally_differentiated=row.get('differentiated_cell_line.terminally_differentiated'), - model_system=row.get('differentiated_cell_line.model_organ.text'), + biomaterial_id=label, + description=row.get('differentiated_product.description'), + cell_line_biomaterial_id=parent_biomaterial_id, # Linking to parental cell line + differentiated_product_protocol_id=row.get('differentiated_product.differentiated_product_protocol_id'), + undifferentiated_product_protocol_id=None, + treatment_condition=row.get('differentiated_product.treatment_condition'), + wt_control_status=row.get('differentiated_product.wt_control_status'), + timepoint_value=row.get('differentiated_product.timepoint_value'), + timepoint_unit=row.get('differentiated_product.timepoint_unit'), + terminally_differentiated=row.get('differentiated_product.final_timepoint'), + model_system=row.get('differentiated_product.model_system'), id=row.get('Id') ) ) return differentiated_cell_lines, df_filtered + # TODO: review def parse_undifferentiated_cell_lines(self, sheet_name, action, @@ -819,16 +954,16 @@ def parse_undifferentiated_cell_lines(self, # df = df.loc[:, ~df.columns.str.startswith('Unnamed')] # Check if the required column exists - if 'differentiated_cell_line.biomaterial_core.biomaterial_id' not in df.columns: - errors.append(f"The column 'differentiated_cell_line.biomaterial_core.biomaterial_id' does not " - f"exist in {sheet_name}. The rest of the file will not be processed") + if 'undifferentiated_product.label' not in df.columns: + errors.append(f"The column 'undifferentiated_product.label' does not " + f"exist in {sheet_name} name. The rest of the file will not be processed") return [], df # Filter rows where biomaterial_id is not null - df = df[df['differentiated_cell_line.biomaterial_core.biomaterial_id'].notna()] + df = df[df['undifferentiated_product.label'].notna()] df = df.map(lambda x: None if isinstance(x, float) and (np.isnan(x) or not np.isfinite(x)) else x) # Define columns to check for values starting with 'ABC' or 'XYZ' - cols_to_check = ['differentiated_cell_line.biomaterial_core.biomaterial_id'] + cols_to_check = ['undifferentiated_product.label'] # Create a mask to filter rows where any of the specified columns start with 'ABC' or 'XYZ' mask = df[cols_to_check].apply(lambda x: ~x.astype(str).str.startswith( ('FILL OUT INFORMATION BELOW THIS ROW', 'A unique ID for the biomaterial.', @@ -839,19 +974,20 @@ def parse_undifferentiated_cell_lines(self, undifferentiated_cell_lines = [] for _, row in df_filtered.iterrows(): - differentiated_biomaterial_id = row['differentiated_cell_line.biomaterial_core.biomaterial_id'] - biomaterial_id = row.get('cell_line.biomaterial_core.biomaterial_id') + label = row['undifferentiated_product.label'] + parent_biomaterial_id = row.get('clonal_cell_line.label') # Check if biomaterial_id is null - if pd.isnull(differentiated_biomaterial_id): - errors.append("Differentiated Cell line ID cannot be null in any row of the Differentiated Cell line " - "sheet.") + if pd.isnull(label): + errors.append( + "Undifferentiated Cell line ID cannot be null in any row of the Undifferentiated Cell line " + "sheet.") # raise MissingMandatoryFieldError("Differentiated Cell line ID cannot be null in any row.") # Check if derived_accession and cell_type are present - if pd.isnull(biomaterial_id): - errors.append(f"Input Cell line ID cannot be null for Differentiated Cell line: " - f"{differentiated_biomaterial_id}") + if pd.isnull(parent_biomaterial_id): + errors.append(f"Input Cell line ID cannot be null for Undifferentiated Cell line: " + f"{label}") """ raise MissingMandatoryFieldError( "Input Cell line ID cannot be null. " + differentiated_biomaterial_id) @@ -860,14 +996,18 @@ def parse_undifferentiated_cell_lines(self, # Create DifferentiatedCellLine objects from filtered DataFrame rows undifferentiated_cell_lines.append( DifferentiatedCellLine( - biomaterial_id=differentiated_biomaterial_id, - description=row.get('differentiated_cell_line.biomaterial_core.biomaterial_description'), - input_biomaterial_id=biomaterial_id, - protocol_id=row.get('differentiation_protocol.protocol_core.protocol_id'), - timepoint_value=row.get('differentiated_cell_line.timepoint_value'), - timepoint_unit=row.get('differentiated_cell_line.timepoint_unit.text'), - terminally_differentiated=row.get('differentiated_cell_line.terminally_differentiated'), - model_system=row.get('differentiated_cell_line.model_organ.text'), + biomaterial_id=label, + description=row.get('undifferentiated_product.description'), + cell_line_biomaterial_id=parent_biomaterial_id, + differentiated_product_protocol_id=None, + undifferentiated_product_protocol_id=row.get( + 'undifferentiated_product.undifferentiated_product_protocol_id'), + treatment_condition=row.get('undifferentiated_product.treatment_condition'), + wt_control_status=row.get('undifferentiated_product.wt_control_status'), + timepoint_value=row.get('undifferentiated_product.timepoint_value'), + timepoint_unit=row.get('undifferentiated_product.timepoint_unit'), + terminally_differentiated=row.get('undifferentiated_product.terminally_differentiated'), + model_system=row.get('undifferentiated_product.model_system'), id=row.get('Id') ) ) @@ -876,6 +1016,7 @@ def parse_undifferentiated_cell_lines(self, def parse_library_preparations(self, sheet_name, + differentiated, action, errors): """ @@ -898,24 +1039,36 @@ def parse_library_preparations(self, # df = df.loc[:, ~df.columns.str.startswith('Unnamed')] # Check if the required column exists required_columns = [ - 'library_preparation.biomaterial_core.biomaterial_id', - 'dissociation_protocol.protocol_core.protocol_id', - 'differentiated_cell_line.biomaterial_core.biomaterial_id', - 'library_preparation_protocol.protocol_core.protocol_id' + 'library_preparation.label', + 'differentiated_product.label', + 'undifferentiated_product.label', + 'library_preparation.library_preparation_protocol_id' ] for col in required_columns: if col not in df.columns: - errors.append(f"The column '{col}' does not exist in the {sheet_name} sheet. " - f"The rest of the file will not be processed") + if col == 'differentiated_product.label' and differentiated: + errors.append(f"The column '{col}' does not exist in the {sheet_name} sheet. " + f"The rest of the file will not be processed") - return [], df + return [], df + elif col == 'undifferentiated_product.label' and not differentiated: + errors.append(f"The column '{col}' does not exist in the {sheet_name} sheet. " + f"The rest of the file will not be processed") + + return [], df + else: + if col not in ('differentiated_product.label', 'undifferentiated_product.label'): + errors.append(f"The column '{col}' does not exist in the {sheet_name} sheet. " + f"The rest of the file will not be processed") + + return [], df # Filter rows where biomaterial_id is not null - df = df[df['library_preparation.biomaterial_core.biomaterial_id'].notna()] + df = df[df['library_preparation.label'].notna()] df = df.map(lambda x: None if isinstance(x, float) and (np.isnan(x) or not np.isfinite(x)) else x) # Define columns to check for values starting with 'ABC' or 'XYZ' - cols_to_check = ['library_preparation.biomaterial_core.biomaterial_id'] + cols_to_check = ['library_preparation.label'] # Create a mask to filter rows where any of the specified columns start with 'ABC' or 'XYZ' mask = df[cols_to_check].apply(lambda x: ~x.astype(str).str.startswith( ('FILL OUT INFORMATION BELOW THIS ROW', 'A unique ID for the biomaterial.', @@ -926,21 +1079,26 @@ def parse_library_preparations(self, library_preparations = [] for _, row in df_filtered.iterrows(): - library_preparation_id = row['library_preparation.biomaterial_core.biomaterial_id'] - dissociation_protocol_id = row.get('dissociation_protocol.protocol_core.protocol_id') - differentiated_biomaterial_id = row.get('differentiated_cell_line.biomaterial_core.biomaterial_id') - library_preparation_protocol_id = row.get('library_preparation_protocol.protocol_core.protocol_id') + label = row['library_preparation.label'] + if differentiated: + differentiated_biomaterial_label = row.get('differentiated_product.label') + else: + differentiated_biomaterial_label = row.get('undifferentiated_product.label') + library_preparation_protocol_id = row.get('library_preparation.library_preparation_protocol_id') # Check if required fields are null - if pd.isnull(library_preparation_id): + if pd.isnull(label): errors.append("Library Preparation ID cannot be null in any row of the Library Preparation sheet.") # raise MissingMandatoryFieldError("Library Preparation ID cannot be null in any row.") - if pd.isnull(dissociation_protocol_id): - errors.append("Dissociation Protocol ID cannot be null in any row of the Library Preparation sheet.") - # raise MissingMandatoryFieldError("Dissociation Protocol ID cannot be null in any row.") - if pd.isnull(differentiated_biomaterial_id): - errors.append("Differentiated Cell Line ID cannot be null in any row of the Library Preparation sheet.") - # raise MissingMandatoryFieldError("Differentiated Cell Line ID cannot be null in any row.") + if pd.isnull(differentiated_biomaterial_label): + if differentiated: + errors.append( + "Differentiated Cell Line ID cannot be null in any row of the Library Preparation sheet.") + # raise MissingMandatoryFieldError("Differentiated Cell Line ID cannot be null in any row.") + else: + errors.append( + "Undifferentiated Cell Line ID cannot be null in any row of the Library Preparation sheet.") + # raise MissingMandatoryFieldError("Differentiated Cell Line ID cannot be null in any row.") if pd.isnull(library_preparation_protocol_id): errors.append( "Library Preparation Protocol ID cannot be null in any row of the Library Preparation sheet.") @@ -949,10 +1107,9 @@ def parse_library_preparations(self, # Create LibraryPreparation objects from filtered DataFrame rows library_preparations.append( LibraryPreparation( - biomaterial_id=library_preparation_id, + biomaterial_id=label, protocol_id=library_preparation_protocol_id, - dissociation_protocol_id=dissociation_protocol_id, - differentiated_biomaterial_id=differentiated_biomaterial_id, + differentiated_biomaterial_id=differentiated_biomaterial_label, average_fragment_size=row.get('library_preparation.average_fragment_size'), input_amount_value=row.get('library_preparation.input_amount_value'), input_amount_unit=row.get('library_preparation.input_amount_unit'), @@ -994,9 +1151,9 @@ def parse_sequencing_files(self, # Check if the required column exists required_columns = [ - 'sequence_file.file_core.file_name', - 'library_preparation.biomaterial_core.biomaterial_id', - 'sequencing_protocol.protocol_core.protocol_id', + 'sequence_file.label', + 'library_preparation.label', + 'sequence_file.extension', 'sequence_file.read_index' ] @@ -1008,15 +1165,15 @@ def parse_sequencing_files(self, return [], df # Filter rows where file_name is not null - df = df[df['sequence_file.file_core.file_name'].notna()] + df = df[df['sequence_file.label'].notna()] df = df.map(lambda x: None if isinstance(x, float) and (np.isnan(x) or not np.isfinite(x)) else x) # Define columns to check for values starting with 'ABC' or 'XYZ' - cols_to_check = ['sequence_file.file_core.file_name'] + cols_to_check = ['sequence_file.label'] # Create a mask to filter rows where any of the specified columns start with 'ABC' or 'XYZ' mask = df[cols_to_check].apply(lambda x: ~x.astype(str).str.startswith( ('FILL OUT INFORMATION BELOW THIS ROW', 'The name of the file.', 'Include the file extension in the file name. For example: R1.fastq.gz; codebook.json', - 'sequence_file.file_core.file_name'))).all(axis=1) + 'sequence_file.label'))).all(axis=1) # Apply the mask to filter out rows df_filtered = df[mask] @@ -1024,9 +1181,8 @@ def parse_sequencing_files(self, sequencing_files = [] for _, row in df_filtered.iterrows(): - file_name = row['sequence_file.file_core.file_name'] - library_preparation_id = row.get('library_preparation.biomaterial_core.biomaterial_id') - sequencing_protocol_id = row.get('sequencing_protocol.protocol_core.protocol_id') + file_name = row['sequence_file.label'] + library_preparation_id = row.get('library_preparation.label') read_index = row.get('sequence_file.read_index') # Check if required fields are null @@ -1036,9 +1192,6 @@ def parse_sequencing_files(self, if pd.isnull(library_preparation_id): errors.append("Library Preparation ID cannot be null in any row of the Sequencing File sheet..") # raise MissingMandatoryFieldError("Library Preparation ID cannot be null in any row.") - if pd.isnull(sequencing_protocol_id): - errors.append("Sequencing Protocol ID cannot be null in any row of the Sequencing File sheet..") - # raise MissingMandatoryFieldError("Sequencing Protocol ID cannot be null in any row.") if pd.isnull(read_index): errors.append("Read Index cannot be null in any row of the Sequencing File sheet..") # raise MissingMandatoryFieldError("Read Index cannot be null in any row.") @@ -1053,7 +1206,6 @@ def parse_sequencing_files(self, read_length=None, checksum=None, library_preparation_id=library_preparation_id, - sequencing_protocol_id=sequencing_protocol_id, run_id=row.get('sequence_file.run_id'), id=row.get('Id') ) @@ -1061,93 +1213,184 @@ def parse_sequencing_files(self, return sequencing_files, df_filtered - def parse_expression_alteration(self, - sheet_name, - action, - errors): + def parse_expression_alteration(self, sheet_name, action, errors): """ Parses data related to expression alterations from a specified sheet in the Excel file. - - Parameters: - ----------- - sheet_name : str - The name of the sheet containing expression alterations data. - action : str - The action to be performed on the data. - errors : list - A list to accumulate error messages. - - Returns: - -------- - tuple - A tuple containing: - - A list of ExpressionAlterationStrategy objects parsed from the specified sheet (if valid) - - The filtered DataFrame of the parsed data - - A boolean indicating whether the expression alteration strategy sheet exists and is valid + For datasets where the expression alteration tab is empty (e.g., UCSF), returns an empty list. """ - # Attempt to parse the input file into a DataFrame try: df = self.input_file_to_data_frames(sheet_name=sheet_name, action=action) except Exception as e: errors.append(f"Missing sheet '{sheet_name}': {e}") return [], None - # Strip whitespace from column names + # If the DataFrame is empty or does not have the required column, return empty results. + if df.empty or 'expression_alteration.label' not in df.columns: + return [], df + df.columns = df.columns.str.strip() - # Check if the required column exists - required_columns = ['expression_alteration_id'] + required_columns = ['expression_alteration.label'] missing_columns = [col for col in required_columns if col not in df.columns] - if missing_columns: errors.append( f"The following required columns are missing in the Expression Alteration Strategy sheet: {', '.join(missing_columns)}") - return None, df, False # Return if required columns are missing + return [], df - # Filter rows where 'expression_alteration_id' is not null - df = df[df['expression_alteration_id'].notna()] - # Replace invalid float values (e.g., NaN, infinite) with None + # Filter rows where 'expression_alteration.label' is not null + df = df[df['expression_alteration.label'].notna()] df = df.map(lambda x: None if isinstance(x, float) and (np.isnan(x) or not np.isfinite(x)) else x) - # Define unwanted patterns to filter out unwanted rows unwanted_patterns = ( 'FILL OUT INFORMATION BELOW THIS ROW', 'A unique ID for the gene expression alteration instance..', 'ID should have no spaces. For example: JAXPE0001_MEIS1, MSKKI119_MEF2C, NWU_AID' ) - - # Create a mask to filter out rows with unwanted starting values - mask = df['expression_alteration_id'].astype(str).str.startswith(unwanted_patterns) + mask = df['expression_alteration.label'].astype(str).str.startswith(unwanted_patterns) df_filtered = df[~mask] - # Initialize the list of ExpressionAlterationStrategy objects expression_alterations = [] - for _, row in df_filtered.iterrows(): expression_alterations.append( ExpressionAlterationStrategy( - expression_alteration_id=row.get('expression_alteration_id'), - protocol_id=row.get('gene_expression_alteration_protocol.protocol_core.protocol_id'), - allele_specific=row.get('gene_expression_alteration_protocol.allele_specific'), - altered_gene_symbols=row.get('gene_expression_alteration_protocol.altered_gene_symbols'), - altered_gene_ids=row.get('gene_expression_alteration_protocol.altered_gene_ids'), - targeted_genomic_region=row.get('gene_expression_alteration_protocol.targeted_genomic_region'), - expected_alteration_type=row.get('gene_expression_alteration_protocol.expected_alteration_type'), - sgrna_target=row.get('gene_expression_alteration_protocol.crispr.sgrna_target'), - protocol_method_text=row.get('gene_expression_alteration_protocol.method.text'), - altered_locus=None, # Placeholder if required - guide_sequence=None, # Placeholder if required + expression_alteration_id=row.get('expression_alteration.label'), + parent_protocol_id=row.get('expression_alteration.parent_protocol_id'), + allele_specific=row.get('expression_alteration.genes.allele_specific'), + altered_gene_symbol=row.get('expression_alteration.genes.altered_gene_symbol'), + target_gene_hgnc_id=row.get('expression_alteration.genes.target_gene_hgnc_id'), + targeted_genomic_region=row.get('expression_alteration.genes.targeted_genomic_region'), + expected_alteration_type=row.get('expression_alteration.genes.expected_alteration_type'), + editing_strategy=row.get('expression_alteration.genes.editing_strategy'), + altered_locus=row.get('expression_alteration.genes.altered_locus'), + guide_sequence=row.get('expression_alteration.genes.guide_sequence'), + method=row.get('expression_alteration.method'), id=row.get('Id') ) ) - # Return the list of objects, the filtered DataFrame, and a flag indicating success return expression_alterations, df_filtered + def find_sheet_name(tab_names, candidates): + """ + Find the first matching sheet name from a list of candidates. + """ + for candidate in candidates: + if candidate in tab_names: + return candidate + return None + + def parse_expression_alteration_with_genes(self, strategy_sheet, action, errors): + """ + Parses pooled expression alteration strategy from the main tab and links all rows + in the 'Expression alteration - Genes' tab to the single strategy that contains + 'various' in gene-related fields. + + Returns: + Tuple[List[ExpressionAlterationStrategy], pd.DataFrame] + """ + try: + df = self.input_file_to_data_frames(sheet_name=strategy_sheet, action=action) + except Exception as e: + errors.append(f"Missing sheet '{strategy_sheet}': {e}") + return [], None + + if df.empty or 'expression_alteration.label' not in df.columns: + errors.append("Expression alteration sheet is empty or missing required column.") + return [], df + + df.columns = df.columns.str.strip() + df = df[df['expression_alteration.label'].notna()] + df = df.applymap(lambda x: None if isinstance(x, float) and (np.isnan(x) or not np.isfinite(x)) else x) + + unwanted_patterns = ( + 'FILL OUT INFORMATION BELOW THIS ROW', + 'A unique ID for the gene expression alteration instance..', + 'ID should have no spaces.' + ) + mask = df['expression_alteration.label'].astype(str).str.startswith(unwanted_patterns) + df_filtered = df[~mask] + + if df_filtered.empty: + errors.append("No valid expression alteration strategy rows found.") + return [], df_filtered + + # Expecting only one strategy (with 'various' gene info) + strategy_row = df_filtered.iloc[0] + label = strategy_row.get('expression_alteration.label') + + # Load gene tab + try: + available_tabs = self.list_sheets() + gene_sheet_name = next( + (name for name in ['expression_alteration_genes', 'Expression alteration - Genes'] if name in available_tabs), + None + ) + if not gene_sheet_name: + raise ValueError("No gene-level sheet found for pooled expression alterations.") + + gene_df = self.input_file_to_data_frames(sheet_name=gene_sheet_name, action=action) + gene_df.columns = gene_df.columns.str.strip() + gene_df = gene_df[gene_df['expression_alteration.genes.altered_gene_symbol'].notna()] + gene_df = gene_df.applymap(lambda x: None if isinstance(x, float) and (np.isnan(x) or not np.isfinite(x)) else x) + + except Exception as e: + errors.append(f"Missing or unreadable gene-level sheet: {e}") + return [], df_filtered + + # Convert gene rows to dicts + genes = [] + flattened_records = [] + for _, gene_row in gene_df.iterrows(): + gene_data = { + 'allele_specific': gene_row.get('expression_alteration.genes.allele_specific'), + 'altered_gene_symbol': gene_row.get('expression_alteration.genes.altered_gene_symbol'), + 'target_gene_hgnc_id': gene_row.get('expression_alteration.genes.target_gene_hgnc_id'), + 'targeted_genomic_region': gene_row.get('expression_alteration.genes.targeted_genomic_region'), + 'expected_alteration_type': gene_row.get('expression_alteration.genes.expected_alteration_type'), + 'editing_strategy': gene_row.get('expression_alteration.genes.editing_strategy'), + 'altered_locus': gene_row.get('expression_alteration.genes.altered_locus'), + 'guide_sequence': gene_row.get('expression_alteration.genes.guide_sequence') + } + genes.append(gene_data) + + # Used later for writing into Excel + flattened_records.append({ + 'expression_alteration.label': label, + 'expression_alteration.parent_protocol_id': strategy_row.get('expression_alteration.parent_protocol_id'), + 'expression_alteration.method': strategy_row.get('expression_alteration.method'), + 'expression_alteration.genes.allele_specific': gene_data['allele_specific'], + 'expression_alteration.genes.altered_gene_symbol': gene_data['altered_gene_symbol'], + 'expression_alteration.genes.target_gene_hgnc_id': gene_data['target_gene_hgnc_id'], + 'expression_alteration.genes.targeted_genomic_region': gene_data['targeted_genomic_region'], + 'expression_alteration.genes.expected_alteration_type': gene_data['expected_alteration_type'], + 'expression_alteration.genes.editing_strategy': gene_data['editing_strategy'], + 'expression_alteration.genes.altered_locus': gene_data['altered_locus'], + 'expression_alteration.genes.guide_sequence': gene_data['guide_sequence'], + 'Id': strategy_row.get('Id') + }) + + if not genes: + errors.append("No valid gene rows found in the gene tab.") + return [], df_filtered + + # Construct strategy with gene list + strategy = ExpressionAlterationStrategy( + expression_alteration_id=label, + parent_protocol_id=strategy_row.get('expression_alteration.parent_protocol_id'), + method=strategy_row.get('expression_alteration.method'), + id=strategy_row.get('Id'), + genes=genes + ) + + expression_alterations_df = pd.DataFrame(flattened_records) + print("Parsed expression alterations:", len(flattened_records)) + return [strategy], expression_alterations_df + def get_cell_lines(self, sheet_name, action, - errors): + errors, + context=None): """ Retrieves parsed cell lines data from a specified sheet in the Excel file. @@ -1163,8 +1406,8 @@ def get_cell_lines(self, list A list of CellLine objects parsed from the specified sheet. """ - cell_lines, cell_lines_df, parent_cell_line_name = self.parse_cell_lines(sheet_name, action, errors) - return cell_lines, cell_lines_df, parent_cell_line_name + cell_lines, cell_lines_df, parent_cell_line_names = self.parse_cell_lines(sheet_name, action, errors, context) + return cell_lines, cell_lines_df, parent_cell_line_names def get_differentiated_cell_lines(self, sheet_name, @@ -1215,6 +1458,7 @@ def get_undifferentiated_cell_lines(self, def get_library_preparations(self, sheet_name, + differentiated, action, errors): """ @@ -1232,7 +1476,7 @@ def get_library_preparations(self, list A list of LibraryPreparation objects parsed from the specified sheet. """ - library_preparations, df_filtered = self.parse_library_preparations(sheet_name, + library_preparations, df_filtered = self.parse_library_preparations(sheet_name, differentiated, action, errors) return library_preparations, df_filtered @@ -1261,6 +1505,24 @@ def get_sequencing_files(self, def get_expression_alterations(self, sheet_name, action, - errors): - expression_alterations, df_filtered = self.parse_expression_alteration(sheet_name, action, errors) - return expression_alterations, df_filtered + errors, + context=None): + """ + Retrieves parsed expression alterations from the appropriate sheet(s) in the Excel file. + + Parameters: + sheet_name (str): Name of the main expression alteration sheet. + action (str): Submission action (ADD, MODIFY, DELETE). + errors (list): A list to collect validation or parsing errors. + context (str, optional): Ingestion context to distinguish between formats. + e.g., 'pooled_differentiated' for MSK-style pooled datasets. + + Returns: + Tuple[List[ExpressionAlterationStrategy], DataFrame]: Parsed strategies and cleaned DataFrame. + """ + if context == 'pooled_differentiated': + print("Using pooled_differentiated parsing: augmenting expression alterations with gene-specific info " + "from 'expression_alteration_genes' tab.") + return self.parse_expression_alteration_with_genes(sheet_name, action, errors) + else: + return self.parse_expression_alteration(sheet_name, action, errors)