Source code for iss.modeller

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging

import numpy as np
from scipy import stats

from iss import util


[docs] def insert_size(template_length_dist, read_length): """Calculate cumulative distribution function from the raw insert size distributin. Uses 1D kernel density estimation. Args: template_length_dist (list): List of template lengths from bam file. read_length (int): The length of the read. Returns: 1darray: a cumulative density function """ # we want to remove zeroes and outliers tld = np.asarray(template_length_dist) min_mask = tld > 0 tld = tld[min_mask] # 2000 is a common upper limit for template length for illumina sequencing max_mask = tld < 2000 tld = tld[max_mask] isd = tld - (2 * read_length) # convert to insert size kde = stats.gaussian_kde(isd, bw_method=0.2 / np.std(isd, ddof=1)) x_grid = np.linspace(min(isd), max(isd), 2000) kde = kde.evaluate(x_grid) cdf = np.cumsum(kde) cdf = cdf / cdf[-1] return cdf
[docs] def divide_qualities_into_bins(qualities, n_bins=4): """Divides the raw quality scores in bins according to the mean phred quality of the sequence they come from Args: qualities (list): raw count of all the phred scores and mean sequence quality n_bins (int): number of bins to create (default: 4) Returns: list: a list of lists containing the binned quality scores """ logger = logging.getLogger(__name__) logger.debug("Dividing qualities into mean clusters") bin_lists = [[] for _ in range(n_bins)] # create list of `n_bins` list ranges = np.split(np.array(range(40)), n_bins) for quality in qualities: mean = int(quality[0][1]) # mean is at 1 and same regardless of b pos which_array = 0 for array in ranges: if mean in array: read = np.fromiter((q[0] for q in quality), float) bin_lists[which_array].append(read) which_array += 1 return bin_lists
[docs] def quality_bins_to_histogram(bin_lists): """Wrapper function to generate cdfs for each quality bins Generate cumulative distribution functions for a number of mean quality bins Args: bins_lists (list): list of list containing raw count of all phred scores Returns: list: a list of lists containg cumulative density functions """ logger = logging.getLogger(__name__) cdf_bins = [] i = 0 for qual_bin in bin_lists: if len(qual_bin) > 1: logger.debug("Transposing matrix for mean cluster #%s" % i) # quals = np.asarray(qual_bin).T # seems to make clunkier models quals = [q for q in zip(*qual_bin)] logger.debug("Modelling quality distribution for mean cluster #%s" % i) cdfs_list = raw_qualities_to_histogram(quals) cdf_bins.append(cdfs_list) else: logger.debug("Mean quality bin #%s of length < 1. Skipping" % i) cdf_bins.append([]) i += 1 return cdf_bins
[docs] def raw_qualities_to_histogram(qualities): """Approximate the distribution of base quality at each position in a read using a pseudo 2d kernel density estimation Generate cumulative distribution functions Args: qualities (list): raw count of all phred scores Returns: list: list of cumulative distribution functions. One cdf per base. The list has the size of the read length """ # logger = logging.getLogger(__name__) # moved this in quality_bins_to_histogram to try parallelization # quals = util.split_list([i for i in zip(*qualities)], n_parts=cpus) cdfs_list = [] for q in qualities: np.seterrcall(util.nplog) with np.errstate(under="ignore", divide="call"): try: kde = stats.gaussian_kde(q, bw_method=0.2 / np.std(q, ddof=1)) except np.linalg.linalg.LinAlgError: # if np.std of array is 0, we modify the array slightly to be # able to divide by ~ np.std # this will print a FloatingPointError in DEBUG mode # logger.debug('np.std of quality array is zero: %s' % e) q = list(q) q[-1] += 1 kde = stats.gaussian_kde(q, bw_method=0.2 / np.std(q, ddof=1)) kde = kde.evaluate(range(41)) cdf = np.cumsum(kde) cdf = cdf / cdf[-1] cdfs_list.append(cdf) return cdfs_list
[docs] def dispatch_subst(base, read, read_has_indels): """Return the x and y position of a substitution to be inserted in the substitution matrix. The substitution matrix is a 2D array of size 301 * 16 The x axis (301) corresponds to the position in the read, while the y axis (16) represents the match or substitution (see the dispatch dict in the function). Positions 0, 4, 8 and 12 are matches, other positions are substitutions The size of x axis is 301 because we haven't calculated the read length yet Args: base (tuple): one base from an aligmnent object. According to the pysam documentation: an alignment is a list of tuples: aligned read (query) and reference positions. the parameter with_seq adds the ref sequence as the 3rd element of the tuples. substitutions are lower-case. read (read): a read object, from which the alignment comes from read_has_indels (bool): a boolean flag to keep track if the read has an indel or not Returns: tuple: x and y position for incrementing the substitution matrix and a third element: True if an indel has been detected, False otherwise """ dispatch_dict = { "AA": 0, "aT": 1, "aG": 2, "aC": 3, "TT": 4, "tA": 5, "tG": 6, "tC": 7, "CC": 8, "cA": 9, "cT": 10, "cG": 11, "GG": 12, "gA": 13, "gT": 14, "gC": 15, } query_pos = base[0] query_base = read.seq[query_pos] ref_base = base[2] dispatch_key = ref_base + query_base if dispatch_key not in dispatch_dict: # flag reads that have one or more indels read_has_indels = True # flag the read for later indel treatment substitution = None # flag this base to skip substitution treatment else: substitution = dispatch_dict[dispatch_key] return (query_pos, substitution, read_has_indels)
[docs] def subst_matrix_to_choices(substitution_matrix, read_length): """Transform a substitution matrix into probabilties of substitutions for each base and at every position From the raw mismatches at one position, returns a dictionary with probabilties of substitutions Args: substitution_matrix (np.array): the substitution matrix is a 2D array of size read_length * 16. fhe x axis (read_length) corresponds to the position in the read, while the y axis (16) represents the match or substitution. Positions 0, 4, 8 and 12 are matches, other positions are substitutions read_length (int): read length Returns: list: list of dictionaries representing the substitution probabilities for a collection of reads """ logger = logging.getLogger(__name__) nucl_choices_list = [] for pos in range(read_length): sums = { "A": np.sum(substitution_matrix[pos][1:4]), "T": np.sum(substitution_matrix[pos][5:8]), "C": np.sum(substitution_matrix[pos][9:12]), "G": np.sum(substitution_matrix[pos][13:]), } # we want to avoid 'na' in the data so we raise FloatingPointError # if we try to divide by 0 (no count data for that nucl at that pos) # we assume equal rate of substitution with np.errstate(all="raise"): nucl_choices = {} try: A = (["T", "C", "G"], [count / sums["A"] for count in substitution_matrix[pos][1:4]]) except FloatingPointError as e: logger.debug(e, exc_info=True) A = (["T", "C", "G"], [1 / 3, 1 / 3, 1 / 3]) try: T = (["A", "C", "G"], [count / sums["T"] for count in substitution_matrix[pos][5:8]]) except FloatingPointError as e: logger.debug(e, exc_info=True) T = (["A", "C", "G"], [1 / 3, 1 / 3, 1 / 3]) try: C = (["A", "T", "G"], [count / sums["C"] for count in substitution_matrix[pos][9:12]]) except FloatingPointError as e: logger.debug(e, exc_info=True) C = (["A", "T", "G"], [1 / 3, 1 / 3, 1 / 3]) try: G = (["A", "T", "C"], [count / sums["G"] for count in substitution_matrix[pos][13:]]) except FloatingPointError as e: logger.debug(e, exc_info=True) G = (["A", "T", "C"], [1 / 3, 1 / 3, 1 / 3]) nucl_choices["A"] = A nucl_choices["T"] = T nucl_choices["C"] = C nucl_choices["G"] = G nucl_choices_list.append(nucl_choices) return nucl_choices_list
[docs] def dispatch_indels(read): """Return the x and y position of a insertion or deletion to be inserted in the indel matrix. The substitution matrix is a 2D array of size 301 * 9 The x axis (301) corresponds to the position in the read, while the y axis (9) represents the match or indel (see the dispatch dict in the function). Positions 0 is match or substitution, other positions in 'N1' are insertions, 'N2 are deletions' The size of x axis is 301 because we haven't calculated the read length yet Args: read (read): an aligned read object Yields: tuple: a tuple with the x, y position for dispatching the indel in the indel matrix """ logger = logging.getLogger(__name__) dispatch_indels = {0: 0, "A1": 1, "T1": 2, "C1": 3, "G1": 4, "A2": 5, "T2": 6, "C2": 7, "G2": 8} position = 0 for cigar_type, cigar_length in read.cigartuples: if cigar_type == 0: # match position += cigar_length continue elif cigar_type == 1: # insertion query_base = read.query_sequence[position] insertion = query_base.upper() + "1" try: indel = dispatch_indels[insertion] dispatch_tuple = (position, indel) position += cigar_length except KeyError: # we avoid ambiguous bases # logger.debug( # '%s not in dispatch: %s' % (insertion, e), exc_info=True) position += cigar_length continue elif cigar_type == 2: # deletion ref_base = read.query_alignment_sequence[position] deletion = ref_base.upper() + "2" try: indel = dispatch_indels[deletion] dispatch_tuple = (position, indel) position -= cigar_length except KeyError: # we avoid ambiguous bases # logger.debug( # '%s not in dispatch: %s' % (deletion, e), exc_info=True) position -= cigar_length continue else: logger.debug("CIGAR %s. Skipping read." % cigar_type) continue yield dispatch_tuple
[docs] def indel_matrix_to_choices(indel_matrix, read_length): """Transform an indel matrix into probabilties of indels for at every position From the raw indel count at one position, returns a dictionary with probabilties of indel Args: indel_matrix (np.array): the substitution matrix is a 2D array of size read_length * 16. fhe x axis (read_length) corresponds to the position in the read, while the y axis (9) represents the match or indel. Positions 0 is match or substitution, other positions in 'N1' are insertions, 'N2 are deletions' read_length (int): read length Returns: tuple: tuple containing two lists of dictionaries representing the insertion or deletion probabilities for a collection of reads """ ins_choices = [] del_choices = [] for pos in range(read_length): insertions = { "A": indel_matrix[pos][1] / indel_matrix[pos][0], "T": indel_matrix[pos][2] / indel_matrix[pos][0], "C": indel_matrix[pos][3] / indel_matrix[pos][0], "G": indel_matrix[pos][4] / indel_matrix[pos][0], } deletions = { "A": indel_matrix[pos][5] / indel_matrix[pos][0], "T": indel_matrix[pos][6] / indel_matrix[pos][0], "C": indel_matrix[pos][7] / indel_matrix[pos][0], "G": indel_matrix[pos][8] / indel_matrix[pos][0], } ins_choices.append(insertions) del_choices.append(deletions) return (ins_choices, del_choices)