diff --git a/scripts/extract_tracts.py b/scripts/extract_tracts.py index d7327e6..6487acc 100755 --- a/scripts/extract_tracts.py +++ b/scripts/extract_tracts.py @@ -45,38 +45,51 @@ --compress-output Compress output dosage, hapcount, and VCF files (default: False). """ + def parse_vcf_filepath(vcf_file_path): if not os.path.exists(vcf_file_path): raise FileNotFoundError(f"File not found: {vcf_file_path}") + if os.path.getsize(vcf_file_path) == 0: + err_msg = f"The provided vcf file, {vcf_file_path}, was empty" + logger.critical(err_msg) + raise ValueError(err_msg) + zipped = False path, filename = os.path.split(vcf_file_path) # check if the file is compressed (ends with '.gz') - if filename.endswith('.vcf.gz'): + if filename.endswith(".vcf.gz"): zipped = True - prefix = os.path.splitext(os.path.splitext(filename)[0])[0] # remove '.vcf.gz' extension - elif filename.endswith('.vcf'): - prefix = os.path.splitext(filename)[0] # remove '.vcf' extension + prefix = os.path.splitext(os.path.splitext(filename)[0])[ + 0 + ] # remove '.vcf.gz' extension + elif filename.endswith(".vcf"): + prefix = os.path.splitext(filename)[0] # remove '.vcf' extension else: # File has an unexpected extension - raise ValueError(f"Unexpected file extension: {filename}. VCF file must end in .vcf or .vcf.gz") + raise ValueError( + f"Unexpected file extension: {filename}. VCF file must end in .vcf or .vcf.gz" + ) - return { - 'path': path, - 'prefix': prefix, - 'zipped': zipped - } + return {"path": path, "prefix": prefix, "zipped": zipped} -def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=None, compress_output=None): +def extract_tracts( + vcf=str, + msp=str, + num_ancs=int, + output_dir=None, + output_vcf=None, + compress_output=None, +): # Housekeeping checks. if not os.path.exists(vcf): raise ValueError(f"The path '{vcf}' does not exist.") if not os.path.exists(msp): - raise ValueError(f"The path '{msp}' does not exist.") + raise ValueError(f"The path '{msp}' does not exist.") if not isinstance(num_ancs, int): raise ValueError("The 'num_ancs' must be an integer.") if output_dir is not None: @@ -85,13 +98,19 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N if not isinstance(output_vcf, bool): raise ValueError("The 'output_vcf' must be a boolean value (True or False).") if not isinstance(compress_output, bool): - raise ValueError("The 'compress_output' must be a boolean value (True or False).") - if compress_output==True: + raise ValueError( + "The 'compress_output' must be a boolean value (True or False)." + ) + if compress_output == True: logger.info("Files will be gzip compressed (not bgzip)") vcf_info = parse_vcf_filepath(vcf) - vcf_path, vcf_prefix, zipped = vcf_info['path'], vcf_info['prefix'], vcf_info['zipped'] - + vcf_path, vcf_prefix, zipped = ( + vcf_info["path"], + vcf_info["prefix"], + vcf_info["zipped"], + ) + # Log version number logger.info("# Running script version : %s", __version__) logger.info("# VCF File : %s", vcf) @@ -109,13 +128,21 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N logger.info("Creating output files for %d ancestries", num_ancs) for i in range(num_ancs): output_files[f"dos{i}"] = f"{output_path}.anc{i}.dosage.txt{file_extension}" - output_files[f"ancdos{i}"] = f"{output_path}.anc{i}.hapcount.txt{file_extension}" + output_files[f"ancdos{i}"] = ( + f"{output_path}.anc{i}.hapcount.txt{file_extension}" + ) if output_vcf: output_files[f"vcf{i}"] = f"{output_path}.anc{i}.vcf{file_extension}" - with open(msp) as mspfile, gzip.open(vcf, "rt") if zipped else open(vcf) as vcf, contextlib.ExitStack() as stack: + with open(msp) as mspfile, ( + gzip.open(vcf, "rt") if zipped else open(vcf) + ) as vcf, contextlib.ExitStack() as stack: files = { - fname: stack.enter_context(gzip.open(output_file, "wt") if compress_output else open(output_file, "w")) + fname: stack.enter_context( + gzip.open(output_file, "wt") + if compress_output + else open(output_file, "w") + ) for fname, output_file in output_files.items() } vcf_header = "" @@ -123,7 +150,7 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N logger.info("Iterating through VCF file") for line in vcf: - skip_line=False + skip_line = False if line.startswith("##"): if output_vcf: vcf_header += line @@ -131,7 +158,12 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N elif line.startswith("#"): if output_vcf: vcf_header += line - anc_header = "\t".join([line.strip("# ").split("\t", 9)[item] for item in [0, 1, 2, 3, 4, 9] ]) + anc_header = "\t".join( + [ + line.strip("# ").split("\t", 9)[item] + for item in [0, 1, 2, 3, 4, 9] + ] + ) for i in range(num_ancs): # Write header into each output vcf files[f"dos{i}"].write(anc_header) @@ -164,9 +196,9 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N # saves the current line until out of the window, then checks next line. VCF and window switches file should be in incremental order. while not (row[0] == window[0] and (window[1] <= pos < window[2])): - if row[0] == window[0] and window[1]> pos: - skip_line=True #Skip VCF line - break #Break out of msp scanning because the VCF position is still before the windows start + if row[0] == window[0] and window[1] > pos: + skip_line = True # Skip VCF line + break # Break out of msp scanning because the VCF position is still before the windows start ancs = mspfile.readline() if ancs.startswith("#"): # skip the header lines continue @@ -176,21 +208,25 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N ancs_entry = ancs.strip().split("\t", 6) calls = ancs_entry[6].split("\t") window = (ancs_entry[0], int(ancs_entry[1]), int(ancs_entry[2])) - if row[0] == window[0] and window[1]> pos: - skip_line=True #Skip VCF line - break #Break out of msp scanning because the VCF position is before the windows start + if row[0] == window[0] and window[1] > pos: + skip_line = True # Skip VCF line + break # Break out of msp scanning because the VCF position is before the windows start if skip_line: - logger.info(f"VCF position, {pos} is not in an msp window, skipping site") - continue # Skip VCF site and continue to next site + logger.info( + f"VCF position, {pos} is not in an msp window, skipping site" + ) + continue # Skip VCF site and continue to next site # index by the number of individuals in the VCF file, should be half the number in the calls file for i, geno in enumerate(genos): - geno_parts = geno.split(":")[0].split("|") # assert incase eagle leaves some genos unphased - geno_a,geno_b = map(str, geno_parts[:2]) - call_a = str(calls[2*i]) - call_b = str(calls[2*i + 1]) - - counts = {anc: 0 for anc in range(num_ancs)} + geno_parts = geno.split(":")[0].split( + "|" + ) # assert incase eagle leaves some genos unphased + geno_a, geno_b = map(str, geno_parts[:2]) + call_a = str(calls[2 * i]) + call_b = str(calls[2 * i + 1]) + + counts = {anc: 0 for anc in range(num_ancs)} anc_counts = {anc: 0 for anc in range(num_ancs)} for j in range(num_ancs): if output_vcf: @@ -231,18 +267,38 @@ def extract_tracts(vcf=str, msp=str, num_ancs=int, output_dir=None, output_vcf=N logger.info("Finished extracting tracts per %d ancestries", num_ancs) + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--vcf",required=True,help="Path to phased genotypes, VCF file (*.vcf or *.vcf.gz)") - parser.add_argument("--msp",required=True,help="Path to ancestry calls, MSP file (*.msp or *.msp.tsv)") - parser.add_argument("--num-ancs",type=int,required=True, - help="Number of ancestral populations within the VCF file.") - parser.add_argument("--output-dir", help="Directory for output files. Directory must already exist.") - parser.add_argument("--output-vcf", help="Boolean. If ancestry-specific VCF files need to be generated", - action="store_true") - parser.add_argument("--compress-output", help="gzip (not bgzip) all output files", - action="store_true") + parser.add_argument( + "--vcf", + required=True, + help="Path to phased genotypes, VCF file (*.vcf or *.vcf.gz)", + ) + parser.add_argument( + "--msp", + required=True, + help="Path to ancestry calls, MSP file (*.msp or *.msp.tsv)", + ) + parser.add_argument( + "--num-ancs", + type=int, + required=True, + help="Number of ancestral populations within the VCF file.", + ) + parser.add_argument( + "--output-dir", help="Directory for output files. Directory must already exist." + ) + parser.add_argument( + "--output-vcf", + help="Boolean. If ancestry-specific VCF files need to be generated", + action="store_true", + ) + parser.add_argument( + "--compress-output", + help="gzip (not bgzip) all output files", + action="store_true", + ) args = parser.parse_args() extract_tracts(**vars(args)) - diff --git a/scripts/extract_tracts_flare.py b/scripts/extract_tracts_flare.py index f0bf4b9..1b64f9a 100755 --- a/scripts/extract_tracts_flare.py +++ b/scripts/extract_tracts_flare.py @@ -47,6 +47,7 @@ """ + def parse_vcf_filepath(vcf_file_path): if not os.path.exists(vcf_file_path): @@ -56,26 +57,31 @@ def parse_vcf_filepath(vcf_file_path): path, filename = os.path.split(vcf_file_path) # check if the file is compressed (ends with '.gz') - if filename.endswith('.vcf.gz'): + if filename.endswith(".vcf.gz"): zipped = True - prefix = os.path.splitext(os.path.splitext(filename)[0])[0] # remove '.vcf.gz' extension - elif filename.endswith('.vcf'): - prefix = os.path.splitext(filename)[0] # remove '.vcf' extension + prefix = os.path.splitext(os.path.splitext(filename)[0])[ + 0 + ] # remove '.vcf.gz' extension + elif filename.endswith(".vcf"): + prefix = os.path.splitext(filename)[0] # remove '.vcf' extension else: # File has an unexpected extension - raise ValueError(f"Unexpected file extension: {filename}. VCF file must end in .vcf or .vcf.gz") + raise ValueError( + f"Unexpected file extension: {filename}. VCF file must end in .vcf or .vcf.gz" + ) + + return {"path": path, "prefix": prefix, "zipped": zipped} - return { - 'path': path, - 'prefix': prefix, - 'zipped': zipped - } -def extract_tracts_flare(vcf=str, num_ancs=int, output_dir=None, output_vcf=None, compress_output=None): +def extract_tracts_flare( + vcf=str, num_ancs=int, output_dir=None, output_vcf=None, compress_output=None +): # Print version of the script - logger.info(f"# This is version {__version__} of the extract_tracts_flare.py script.") - + logger.info( + f"# This is version {__version__} of the extract_tracts_flare.py script." + ) + # Housekeeping checks. if not os.path.exists(vcf): raise ValueError(f"The path '{vcf}' does not exist.") @@ -87,12 +93,18 @@ def extract_tracts_flare(vcf=str, num_ancs=int, output_dir=None, output_vcf=None if not isinstance(output_vcf, bool): raise ValueError("The 'output_vcf' must be a boolean value (True or False).") if not isinstance(compress_output, bool): - raise ValueError("The 'compress_output' must be a boolean value (True or False).") - if compress_output==True: - logger.info("Files will be gzip compressed (not bgzip)") - + raise ValueError( + "The 'compress_output' must be a boolean value (True or False)." + ) + if compress_output == True: + logger.info("Files will be gzip compressed (not bgzip)") + vcf_info = parse_vcf_filepath(vcf) - vcf_path, vcf_prefix, zipped = vcf_info['path'], vcf_info['prefix'], vcf_info['zipped'] + vcf_path, vcf_prefix, zipped = ( + vcf_info["path"], + vcf_info["prefix"], + vcf_info["zipped"], + ) # Log version number logger.info("# Running script version : %s", __version__) @@ -101,7 +113,7 @@ def extract_tracts_flare(vcf=str, num_ancs=int, output_dir=None, output_vcf=None logger.info("# VCF File is compressed? : %s", zipped) logger.info("# Number of Ancestries in VCF : %d", num_ancs) logger.info("# Output Directory : %s", output_dir) - + output_files = {} output_path = f"{os.path.join(output_dir, vcf_prefix) if output_dir else os.path.join(vcf_path, vcf_prefix)}" @@ -111,35 +123,48 @@ def extract_tracts_flare(vcf=str, num_ancs=int, output_dir=None, output_vcf=None logger.info("Creating output files for %d ancestries", num_ancs) for i in range(num_ancs): if output_vcf: - output_files[f"vcf{i}"] = f"{output_path}.anc{i}.vcf{file_extension}" + output_files[f"vcf{i}"] = f"{output_path}.anc{i}.vcf{file_extension}" output_files[f"dos{i}"] = f"{output_path}.anc{i}.dosage.txt{file_extension}" - output_files[f"ancdos{i}"] = f"{output_path}.anc{i}.hapcount.txt{file_extension}" + output_files[f"ancdos{i}"] = ( + f"{output_path}.anc{i}.hapcount.txt{file_extension}" + ) logger.info("Opening input and output files for reading and writing") - with gzip.open(vcf, "rt") if zipped else open(vcf) as vcf, contextlib.ExitStack() as stack: + with ( + gzip.open(vcf, "rt") if zipped else open(vcf) + ) as vcf, contextlib.ExitStack() as stack: files = { - fname: stack.enter_context(gzip.open(output_file, "wt") if compress_output else open(output_file, "w")) + fname: stack.enter_context( + gzip.open(output_file, "wt") + if compress_output + else open(output_file, "w") + ) for fname, output_file in output_files.items() } vcf_header = "" - + logger.info("Iterating through VCF file") for line in vcf: if line.startswith("##"): if output_vcf: - vcf_header += line + vcf_header += line continue elif line.startswith("#"): if output_vcf: vcf_header += line - anc_header = "\t".join([ line.strip("# ").split("\t", 9)[item] for item in [0, 1, 2, 3, 4, 9] ]) + anc_header = "\t".join( + [ + line.strip("# ").split("\t", 9)[item] + for item in [0, 1, 2, 3, 4, 9] + ] + ) for i in range(num_ancs): # Write header into each output vcf files[f"dos{i}"].write(anc_header) files[f"ancdos{i}"].write(anc_header) if output_vcf: files[f"vcf{i}"].write(vcf_header) - + if not line.startswith("#"): # Entry format is ['chrom', 'pos', 'id', 'ref', 'alt', 'qual', 'filter', 'info', 'format', 'genotypes'] row = line.strip().split("\t", 9) @@ -152,77 +177,97 @@ def extract_tracts_flare(vcf=str, num_ancs=int, output_dir=None, output_vcf=None # split each strand geno call apart from each other genos = row[9].split("\t") output_lines = {} - pop_genos = {} + # We are going to store every value as a list because then we can just join + # the strings right before writing and reduce the number of memory allocations + # from the string overhead for i in range(num_ancs): # Write entries into each output files' list - output_lines[f"dos{i}"] = dos_anc_out - output_lines[f"ancdos{i}"] = dos_anc_out + output_lines[f"dos{i}"] = [dos_anc_out] + output_lines[f"ancdos{i}"] = [dos_anc_out] if output_vcf: - output_lines[f"vcf{i}"] = vcf_out - - # Genotype data is of format GT1|GT2:ANC1:ANC2, where GT1:GT2 is the genotype, + output_lines[f"vcf{i}"] = [vcf_out] + + # Genotype data is of format GT1|GT2:ANC1:ANC2, where GT1:GT2 is the genotype, # and ANC1:ANC2 are the ancestry calls - for i,geno in enumerate(genos): - geno_parts = re.split('[|:]',geno) + for i, geno in enumerate(genos): + # We know what the string will look like each time so we can + # just call 2 split calls. + genotype, call_a, call_b = geno.split(":") + # We are going to need to convert call_a and call_b to integers + # for later comparision + call_a = int(call_a) + call_b = int(call_b) - geno_a, geno_b, call_a, call_b = map(str, geno_parts[:4]) + geno_a, geno_b = genotype.split("|") - counts = {anc: 0 for anc in range(num_ancs)} - anc_counts = {anc: 0 for anc in range(num_ancs)} + # # i think we can just replace these with counters + # counts = {anc: 0 for anc in range(num_ancs)} + # anc_counts = {anc: 0 for anc in range(num_ancs)} for j in range(num_ancs): + # case where both calls are for the same ancestry + is_match_a = call_a == j + is_match_b = call_b == j + + # We can add together boolean values to get ancestry haplotype count + anc_counts = is_match_a + is_match_b + + # We can do the same thing for the allele counts + allele_count_1 = is_match_a and geno_a == "1" + allele_count_2 = is_match_b and geno_b == "1" + + allele_counts = allele_count_1 + allele_count_2 + + # now we can add each of the values to our list + output_lines[f"dos{j}"].append(f"{allele_counts}") + output_lines[f"ancdos{j}"].append(f"{anc_counts}") + if output_vcf: - pop_genos[j] = "" - if call_a == str(j): - if output_vcf: - pop_genos[j]+= geno_a - anc_counts[j]+=1 - if geno_a == "1": - counts[j]+=1 - else: - if output_vcf: - pop_genos[j] += "." - - if call_b == str(j): - if output_vcf: - pop_genos[j] += "|" + geno_b - anc_counts[j]+=1 - if geno_b == "1": - counts[j]+=1 - else: - if output_vcf: - pop_genos[j] += "|." - - output_lines[f"dos{j}"] += "\t" + str(counts[j]) - output_lines[f"ancdos{j}"] += "\t" + str(anc_counts[j]) - if output_vcf: - output_lines[f"vcf{j}"] += "\t" + pop_genos[j] + geno_1 = geno_a if is_match_a else "." + geno_2 = geno_b if is_match_b else "." + output_lines[f"vcf{j}"].append(f"{geno_1}|{geno_2}") + + # We are going to iterate over the list of values we've collected for + # each ancestry and then join them and write to an output file for j in range(num_ancs): - output_lines[f"dos{j}"] += "\n" - output_lines[f"ancdos{j}"] += "\n" - files[f"dos{j}"].write(output_lines[f"dos{j}"]) - files[f"ancdos{j}"].write(output_lines[f"ancdos{j}"]) + dosage_output_line = "\t".join(output_lines[f"dos{j}"]) + ancdos_output_line = "\t".join(output_lines[f"ancdos{j}"]) + files[f"dos{j}"].write(f"{dosage_output_line}\n") + files[f"ancdos{j}"].write(f"{ancdos_output_line}\n") if output_vcf: - output_lines[f"vcf{j}"] += "\n" - files[f"vcf{j}"].write(output_lines[f"vcf{j}"]) - + vcf_output_lines = "\t".join(output_lines[f"vcf{j}"]) + files[f"vcf{j}"].write(f"{vcf_output_lines}\n") + logger.info("Finished extracting tracts for %d ancestries.", num_ancs) - + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--vcf",required=True,help="Path to VCF file (*.vcf or *.vcf.gz)") - parser.add_argument("--num-ancs",type=int,required=True, - help="Number of ancestral populations within the VCF file.") - parser.add_argument("--output-dir", help="Directory for output files. Directory must already exist.") - parser.add_argument("--output-vcf", help="Boolean. If ancestry-specific VCF files need to be generated", - action="store_true") - parser.add_argument("--compress-output", help="gzip (not bgzip) all output files", - action="store_true") + parser.add_argument( + "--vcf", required=True, help="Path to VCF file (*.vcf or *.vcf.gz)" + ) + parser.add_argument( + "--num-ancs", + type=int, + required=True, + help="Number of ancestral populations within the VCF file.", + ) + parser.add_argument( + "--output-dir", help="Directory for output files. Directory must already exist." + ) + parser.add_argument( + "--output-vcf", + help="Boolean. If ancestry-specific VCF files need to be generated", + action="store_true", + ) + parser.add_argument( + "--compress-output", + help="gzip (not bgzip) all output files", + action="store_true", + ) args = parser.parse_args() extract_tracts_flare(**vars(args)) - - logger.info("# Run complete") + logger.info("# Run complete")