Source code for kbbq.covariate

"""
A data class and helper functions for read covariates.

This module includes the :class:`Covariate` class and subclasses,
as well as the :func:`pad_axis` helper function for appending
to the end of an arbitrary axis of an :class:`numpy.ndarray`.

Classes
    * :class:`Covariate` - a covariate base class
    * :class:`RGCovariate`
    * :class:`QCovariate`
    * :class:`CycleCovariate`
    * :class:`DinucCovariate`
    * :class:`CovariateData` - a container for the covariates

Functions
    * :func:`pad_axis` - helper function to pad an :class:`np.ndarray` with zeros.
"""
from . import read
from . import compare_reads
import numpy as np

[docs]def pad_axis(array, axis, n): """ Pad the axis of :class:`np.ndarray` with n zeros. :param int axis: axis to pad :param int n: size of pad to apply :return: padded array :rtype: :class:`numpy.npdarray` """ return np.append(array, np.zeros(array.shape[0:axis] + (n,) + array.shape[axis+1:]), axis = axis)
[docs]class Covariate(): """ A base class for covariates. Holds 2 :class:`np.ndarray`, :attr:`errors` for number of errors and :attr:`total` for the total number of observations of this covariate. The indices of each array are they keys; ie. the number of times covariate 2 was observed can be found with :code:`self.total[2]`. .. warning:: Don't assign directly to :attr:`errors` or :attr:`total`. :meth:`shape` assumes both arrays have the same shape and only returns the shape of :attr:`total`. If the arrays' shapes become different somehow, :meth:`__getitem__` and :meth:`__setitem__` may start throwing errors unexpectedly. If you want to use two arrays that aren't guaranteed to have the same size, use two arrays. This isn't the right class for that use case. :meth:`__getitem__` will pass the key to both arrays and return a tuple of (errors, total). As a pneumonic, you can remember this as a fraction from left to right, as total should always be larger than errors. :meth:`__setitem__` will also pass the key to both arrays, and will expect value to be a tuple of values, setting the first value to errors and the second to total. Attributes * :attr:`errors` - :class:`numpy.ndarray` of the number of errors observed * :attr:`total` - :class:`numpy.ndarray` of the total number of observations Methods * :meth:`__init__` - Initialize arrays with desired shape. * :meth:`pad_axis` - Pad specified axis of both arrays * :meth:`pad_axis_to_fit` - Pad specified axis so the given index is valid * :meth:`increment` - Increment the given indices of each array. * :meth:`shape` - return the shape of the arrays. * :meth:`__getitem__` - get number of errors and total observations at the given index * :meth:`__setitem__` - set errors and observations at the given index """
[docs] def __init__(self, shape = 0): """ Initialize the arrays with the given shapes. :param shape tuple: desired shape of error and total arrays :return: empty object :rtype: :class:`Covariate` """ self.errors = np.zeros(shape, dtype = np.int) """:class:`np.ndarray` (int) holding number of errors observed for each key.""" self.total = np.zeros(shape, dtype = np.int) """:class:`np.ndarray` (int) holding number of observations for each key."""
[docs] def pad_axis(self, axis, n = 1): """ Expand axis dimension by n. :param int axis: axis to pad :param int n: size of pad to apply """ self.errors = pad_axis(self.errors, axis = axis, n = n) self.total = pad_axis(self.total, axis = axis, n = n)
[docs] def pad_axis_to_fit(self, axis, idx): """ Expand the axis until idx is a valid idx in that dimension. Does nothing if idx is already valid. idx should be a single scalar value, not a slice. :param int axis: axis to pad :param int idx: index to ensure is valid """ axislen = self.shape()[axis] if idx < -axislen or idx >= axislen: #out of bounds if idx < 0: padsize = -idx - axislen else: padsize = idx - axislen + 1 self.pad_axis(axis = axis, n = padsize)
[docs] def increment(self, idx, value = (1,1)): """ Increment the given indices by value. :code:`idx` should be a tuple of values interpretable by :func:`np.add`. The first indices given will increment the error array. The second indices given will increment the total array. Note that this is implemented with :func:`np.add.at`. For example, if :code:`idx[0]` is :code:`[2,2]`, :attr:`errors` [2] will be incremented twice. This is equivalent to .. highlight:: python .. code-block:: python errors[2] += 1 errors[2] += 1 If :code:`value[0]` is 2, :attr:`errors` will be instead incremented by 4 and is equivalent to .. code-block:: python errors[2] += 2 errors[2] += 2 :param idx: indices to increment array at :type idx: tuple( index , index ) :param value: value to add for each index given. :type value: typle( int, int ) """ np.add.at(self.errors, idx[0], value[0]) np.add.at(self.total, idx[1], value[1])
[docs] def shape(self): """ Return the shape of the underlying arrays. :return: array shape :rtype: tuple(int) """ assert self.total.shape == self.errors.shape return self.total.shape
[docs] def __getitem__(self, key): """ Pass key to each :class:`np.ndarray` and return the result as a tuple. The result will be in order of (errors, total). As a pneumonic, you can remember this as a fraction from left to right, as total should always be larger than errors. :param slice key: indices to get :return: number of errors and number of total observations :rtype: tuple(int, int) or tuple( :class:`numpy.ndarray` , :class:`numpy.ndarray` ) """ return (self.errors[key], self.total[key])
[docs] def __setitem__(self, key, value): """ Pass key to each :class:`np.ndarray` and assign each value. Value must be a tuple of values to assign. The first value is assigned to errors and the second to total. :param slice key: indices to set :param tuple(int,int) value: values to set """ self.errors[key] = value[0] self.total[key] = value[1]
[docs]class RGCovariate(Covariate): """ A 1d covariate array in the read group dimension. Methods * :meth:`__init__` - Initialize arrays * :meth:`consume_read` - increment the covariates given a :class:`kbbq.read.ReadData` * :meth:`num_rgs` - return number of rgs observed so far """
[docs] def __init__(self): """ Initialize a 1D read group array. :return: empty object :rtype: :class:`Covariate` """ super().__init__(shape = 0)
[docs] def consume_read(self, read): """ Increment the rg covariates given a :class:`kbbq.read.ReadData` :param read: read to take covariate values from :type read: :class:`kbbq.read.ReadData` :return: erroneous and valid read group values :rtype: tuple( :class:`numpy.ndarray` , :class:`numpy.ndarray` ) """ rge, rgv = read.get_rg_errors() self.pad_axis_to_fit(axis = 0, idx = rgv[0]) self.increment((rge,rgv)) return rge, rgv
[docs] def num_rgs(self): """ Return size needed to hold largest rg observed so far. :return: largest rg + 1 :type: int """ return self.shape()[0]
[docs]class QCovariate(Covariate): """ A 2d covariate array in the read group and Q dimensions. This class holds a :class:`RGCovariate` to efficiently increment both objects while iterating through reads. Attributes * :attr:`rgcov` - RG covariates Methods * :meth:`__init__` - Initialize arrays * :meth:`consume_read` - increment the rg and q covariates given a :class:`kbbq.read.ReadData` * :meth:`num_qs` - return number of qs observed so far """
[docs] def __init__(self): """ Initialize an empty :class:`RGCovariate` and initialize the empty q arrays. """ self.rgcov = RGCovariate() """RG covariates""" super().__init__(shape = (0,0))
[docs] def consume_read(self, read): """ Increment the rg and q covariates given a :class:`kbbq.read.ReadData` :param read: read to take covariate values from :type read: :class:`kbbq.read.ReadData` :return: erroneous and valid read group values and erroneous and valid q values :rtype: (( :class:`numpy.ndarray` , :class:`numpy.ndarray` ),( :class:`numpy.ndarray` , :class:`numpy.ndarray` )) """ rge, rgv = self.rgcov.consume_read(read) self.pad_axis_to_fit(axis = 0, idx = self.rgcov.num_rgs() - 1) qe, qv = read.get_q_errors() try: self.increment(idx = ((rge,qe),(rge,qv))) except IndexError: #this way we only need to find the max if assignment fails self.pad_axis_to_fit(axis = 1, idx = np.amax(qv)) self.increment(idx = ((rge,qe),(rge,qv))) return (rge, rgv), (qe, qv)
[docs] def num_qs(self): """ Return size needed to hold largest q observed so far. :return: largest q + 1 :type: int """ return self.shape()[1]
[docs]class CycleCovariate(Covariate): """ A 3d covariate array in the read group, Q, and cycle dimensions. :meth:`pad_axis` is overriden here since the way cycles are stored means appending to the end of the array changes the location of the data. Methods: * :meth:`__init__` - Initialize empty arrays * :meth:`pad_axis` - Pad given axis. """
[docs] def __init__(self): """ Initialize empty arrays. """ super().__init__(shape = (0,0,0))
[docs] def pad_axis(self, axis, n = 1): """ Expand the axis dimension by n. This is overriden for the CycleCovariate since we use one array to handle both positive and negative cycle values. Appending 0's to the end would ruin the array since negative indices are significant. Thus, we have to wrangle the existing data a bit. :param int axis: axis to pad :param int n: size of pad to apply """ if not (axis == 2 or axis == -1): super().pad_axis(axis = axis, n = n) else: if n % 2 != 0: raise ValueError('n should be even for the 2nd axis ' + 'of a CycleCovariate. n = {} was given.'.format(n)) oldlen = self.shape()[2] newlen = oldlen + n if oldlen == 0: super().pad_axis(axis = axis, n = n) else: half_oldlen = int(oldlen / 2) newerrors = np.zeros(self.shape()[0:2] + (newlen,), dtype = np.int) newtotal = np.zeros(self.shape()[0:2] + (newlen,), dtype = np.int) newerrors[...,0:half_oldlen], newtotal[...,0:half_oldlen] = self[...,0:half_oldlen] newerrors[...,-half_oldlen:], newtotal[...,-half_oldlen:] = self[...,-half_oldlen:] self.errors = newerrors self.total = newtotal
[docs] def num_cycles(self): """ Return number of cycles in the cycle covariate This is the half the length of the cycle vector; there are twice as many cycles since positive and negative cycles are treated differently. :return: half the length of cycle vector :rtype: int """ return self.shape()[-1] / 2
[docs]class DinucCovariate(Covariate): """ A 3d covariate array in the read group, Q, and dinucleotide dimensions. """
[docs] def __init__(self): """ Initialize empty arrays. """ super().__init__(shape = (0,0,len(compare_reads.Dinucleotide.dinucs)))
[docs] def num_dinucs(self): """ Return length of dinucleotide array. :return: number of dinucleotides :rtype: int """ return self.shape()[-1]
[docs]class CovariateData(): """ A class for holding all the covariate data needed to recalibrate reads. Attributes * :attr:`rgcov` - RGCovariate * :attr:`qcov` - QCovariate * :attr:`poscov` - PosCovariate * :attr:`dinuccov` - DinucCovariate Methods * :meth:`consume_read` - Add covariates from the read to the proper data arrays * :meth:`get_num_rgs` - Return number of rgs in the rg covariate * :meth:`get_num_qs` - Return number of qs in the q covariate * :meth:`get_num_cycles` - Return number of cycles in the cycle covariate * :meth:`get_num_dinucs` - Return number of dinucs in the dinuc covariate """
[docs] def __init__(self): """ Initialize covariate arrays. """ self.qcov = QCovariate() """Q covariates""" self.cyclecov = CycleCovariate() """Cycle covariates""" self.dinuccov = DinucCovariate() """Dinucleotide covariates"""
[docs] def consume_read(self, read): """ Add covariates from the read to the proper data arrays. :param read: the read to take data from :type read: :class:`kbbq.read.ReadData` """ (rge, rgv), (qe, qv) = self.qcov.consume_read(read) ce, cv = read.get_cycle_errors() de, dv = read.get_dinuc_errors() num_rgs = self.get_num_rgs() num_qs = self.get_num_qs() for cov in self.cyclecov, self.dinuccov: cov.pad_axis_to_fit(axis = 0, idx = num_rgs - 1) cov.pad_axis_to_fit(axis = 1, idx = num_qs - 1) self.cyclecov.pad_axis_to_fit(axis = 2, idx = 2 * len(read) - 1) self.cyclecov.increment(idx = ((rge,qe,ce),(rgv,qv,cv))) self.dinuccov.increment(idx = ((rge,qe,de),(rgv,qv,dv)))
[docs] def get_num_rgs(self): """ Return number of rgs in the rg covariate :return: length of rg vector :rtype: int """ return self.qcov.rgcov.num_rgs()
[docs] def get_num_qs(self): """ Return number of qs in the q covariate :return: length of q vector :rtype: int """ return self.qcov.num_qs()
[docs] def get_num_cycles(self): """ Return number of cycles in the cycle covariate This is the half the length of the cycle vector; there are twice as many cycles since positive and negative cycles are treated differently. :return: half the length of cycle vector :rtype: int """ return self.cyclecov.num_cycles()
[docs] def get_num_dinucs(self): """ Return number of dinucs in the dinuc covariate. :return: length of the dinuc vector :rtype: int """ return self.dinuccov.num_dinucs()