Source code for kbbq.benchmark

"""
Utilities for benchmarking calibration methods.
"""

from kbbq import compare_reads
from kbbq import read
from kbbq import recalibrate
import pysam
import numpy as np
import pathlib
import contextlib
import gzip

[docs]def get_ref_dict(reffilename): fasta = pysam.FastaFile(reffilename) ref = {chrom : np.array(list(fasta.fetch(reference = chrom)), dtype = np.unicode) for chrom in fasta.references} return ref
[docs]def get_var_sites(vcf): vcf = pysam.VariantFile(vcf) d = dict() for record in vcf: for i in range(record.start, record.stop, 1): d.setdefault(record.chrom, list()).append(i) return d
[docs]def get_bed_dict(refdict, bedfh): beddict = {chrom: np.zeros(len(refdict[chrom]), dtype = np.bool) for chrom in refdict.keys()} for bedline in pysam.tabix_iterator(bedfh, parser = pysam.asBed()): beddict[bedline.contig][bedline.start:bedline.end] = True #end is 1 past the actual end, so this slice should work properly return beddict
[docs]def get_full_skips(refdict, var_sites, bedfh = None): skips = {chrom: np.zeros(len(refdict[chrom]), dtype = np.bool) for chrom in refdict.keys()} for chrom in skips.keys(): variable_positions = np.array(var_sites[chrom], dtype = np.int) skips[chrom][variable_positions] = True if bedfh is not None: beddict = get_bed_dict(refdict, bedfh) for chrom in skips.keys(): skips[chrom][~beddict[chrom]] = True #skip anything not in the BED return skips
[docs]def get_bam_readname(read): """ Given a bam read, get a canonicalized name. The name is the query name + a suffix based on whether it is first or second in pair. """ suffix = ("/2" if read.is_read2 else "/1") return read.query_name + suffix
[docs]def get_fastq_readname(read): """ Given a fastq read, get a canonicalized name. This is based on whether the read is first in pair. """ return read.name.split(sep = '_')[0]
[docs]def get_error_dict(bamfile, refdict, fullskips): """ Finds errors given an authoritative BAM, ref, and sites to skip. The returned dict has readname keys and tuple(np.array, np.array) of bools as values. """ edict = dict() for rd, original in recalibrate.yield_reads(bamfile): name = rd.canonical_name() e, s = compare_reads.find_read_errors(original, refdict, fullskips) edict[name] = (e,s) return edict
[docs]def calculate_q(numerrs, numtotal): """ Calculate Actual Q and Predicted Q given an array containing counts of errors and an array containing total counts for each quality score. The index of the returned array represents the predicted quality score, the value represents the actual quality score or the number of bases. """ nonzero = (numtotal != 0) p = np.true_divide(numerrs[nonzero], numtotal[nonzero]) q = compare_reads.p_to_q(p) actual_q = np.zeros(len(numtotal), dtype = np.int) actual_q[nonzero] = q return actual_q, numtotal
[docs]def benchmark_files(bamfh, ref, fullskips, use_oq = False, fastqfh = None, namedelimiter = '_'): """ Benchmark the given files and return the actual and predicted Q. If fastqfh is None (the default), reads from the BAM will be directly benchmarked. Otherwise, names from the fastq files will be used to look up """ if fastqfh is not None: edict = get_error_dict(bamfh, ref, fullskips) readpairs = recalibrate.yield_reads(fastqfh, namedelimiter = namedelimiter) error_finder = lambda x, y: edict.get(x.canonical_name()) #x=ReadData, y=read else: readpairs = recalibrate.yield_reads(bamfh, use_oq = use_oq) error_finder = lambda x, y: compare_reads.find_read_errors(y, ref, fullskips) error_counter = np.zeros(compare_reads.RescaledNormal.maxscore + 1, dtype = np.int64) q_counter = np.zeros(compare_reads.RescaledNormal.maxscore + 1, dtype = np.int64) for rd, original in readpairs: rd.errors, rd.skips = error_finder(rd, original) qe, qv = rd.get_q_errors() np.add.at(error_counter, qe, 1) np.add.at(q_counter, qv, 1) return calculate_q(error_counter, q_counter)
[docs]@contextlib.contextmanager def open_bedfile(bedfile): """ Attempt to open the given bedfile and return a filehandle. If bedfile is None, None will be returned. This is a context manager; the file will be closed once it goes out of scope. """ if bedfile is None: bedfh = None else: if pathlib.Path(bedfile).suffix == '.gz': bedfh = gzip.open(bedfile, 'r') else: bedfh = open(str(bedfile), mode = 'r') try: yield bedfh finally: if bedfh is not None: bedfh.close()
[docs]def benchmark(bamfile, fafile, vcffile, fastqfile = None, label = None, use_oq = False, bedfile = None, namedelimiter = '_'): """ Perform the benchmark and print the results to stdout. If a fastqfile is not given, the benchmark will be taken from the reads in the BAM file. If a fastqfile is given, it should have the same read names as the reads in the BAM file. The names will then be used to look up where errors occur in the reads. """ bam = pysam.AlignmentFile(bamfile, 'r') ref = get_ref_dict(fafile) var_sites = get_var_sites(vcffile) with open_bedfile(bedfile) as bedfh: fullskips = get_full_skips(ref, var_sites, bedfh) with pysam.AlignmentFile(bamfile, 'r') as bam: if fastqfile is not None: # bamfh, ref, fullskips, use_oq = False, fastqfh = None, namedelimiter = '_' with pysam.FastxFile(fastqfile) as fastqfh: actual_q, nbases = benchmark_files(bam, ref, fullskips, use_oq, fastqfh, namedelimiter) label = (fastqfile if label is None else label) else: actual_q, nbases = benchmark_files(bam, ref, fullskips, use_oq) label = (bamfile if label is None else label) print_benchmark(actual_q, label, nbases)