Source code for iss.util

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import gzip
import logging
import os
import pickle
import random
import sys
from shutil import copyfileobj

import numpy as np
from Bio import SeqIO


[docs] def phred_to_prob(q): """Convert a phred score (Sanger or modern Illumina) in probabilty Given a phred score q, return the probabilty p of the call being right Args: q (int): phred score Returns: float: probabilty of basecall being right """ p = 10 ** (-q / 10) return 1 - p
[docs] def prob_to_phred(p): """Convert a probabilty into a phred score (Sanger or modern Illumina) Given a probabilty p of the basecall being right, return the phred score q Args: p (int): probabilty of basecall being right Returns: int: phred score """ q = int(round(-10 * np.log10(1 - p))) return q
[docs] def rev_comp(s): """A simple reverse complement implementation working on strings Args: s (string): a DNA sequence (IUPAC, can be ambiguous) Returns: list: reverse complement of the input sequence """ bases = { "a": "t", "c": "g", "g": "c", "t": "a", "y": "r", "r": "y", "w": "w", "s": "s", "k": "m", "m": "k", "n": "n", "b": "v", "v": "b", "d": "h", "h": "d", "A": "T", "C": "G", "G": "C", "T": "A", "Y": "R", "R": "Y", "W": "W", "S": "S", "K": "M", "M": "K", "N": "N", "B": "V", "V": "B", "D": "H", "H": "D", } sequence = list(s) complement = "".join([bases[b] for b in sequence]) reverse_complement = complement[::-1] return reverse_complement
[docs] def count_records(fasta_file): """Count the number of records in a fasta file and return a list of recods id Args: fasta_file (string): the path to a fasta file Returns: list: a list of record ids """ logger = logging.getLogger(__name__) record_list = [] for record in SeqIO.parse(fasta_file, "fasta"): record_list.append(record.id) try: assert len(record_list) != 0 except AssertionError: logger.error("Failed to find records in genome(s) file:%s" % fasta_file) sys.exit(1) else: return record_list
[docs] def split_list(lst, n_parts=1): """Split a list in a number of parts Args: l (list): a list n_parts (in): the number of parts to split the list in Returns: list: a list of n_parts lists """ length = len(lst) return [lst[i * length // n_parts : (i + 1) * length // n_parts] for i in range(n_parts)]
[docs] def nplog(type, flag): logger = logging.getLogger(__name__) logger.debug("FloatingPointError (%s), with flag %s" % (type, flag))
[docs] def convert_n_reads(unit): """For strings representing a number of bases and ending with k, K, m, M, g, and G converts to a plain old number Args: n (str): a string representing a number ending with a suffix Returns: float: a number of reads """ logger = logging.getLogger(__name__) suffixes = {"k": 3, "m": 6, "g": 9} if unit[-1].isdigit(): try: unit_int = int(unit) except ValueError: logger.error("%s is not a valid number of reads" % unit) sys.exit(1) elif unit[-1].lower() in suffixes: number = unit[:-1] exponent = suffixes[unit[-1].lower()] unit_int = int(float(number) * 10**exponent) else: logger.error("%s is not a valid number of reads" % unit) sys.exit(1) return unit_int
[docs] def genome_file_exists(filename): """Checks if the output file from the --ncbi option already exists Args: filename (str): a file name """ logger = logging.getLogger(__name__) try: assert os.path.exists(filename) is False except AssertionError: logger.error("%s already exists. Aborting." % filename) logger.error("Maybe use another --output prefix") sys.exit(1)
[docs] def reservoir(records, record_list, n=None): """yield a number of records from a fasta file using reservoir sampling Args: records (obj): fasta records from SeqIO.parse Yields: record (obj): a fasta record """ logger = logging.getLogger(__name__) if n is not None: try: total = len(record_list) assert n < total except AssertionError: logger.error("-u should be strictly smaller than total number of records.") sys.exit(1) else: random.seed() x = 0 samples = sorted(random.sample(range(0, total - 1), n)) for sample in samples: while x < sample: x += 1 _ = records.__next__() record = records.__next__() x += 1 yield record else: for record in records: yield record
[docs] def concatenate(file_list, output, header=None): """Concatenate files together Args: file_list (list): the list of input files (can be a generator) output (string): the output file name """ logger = logging.getLogger(__name__) logger.info("Stitching input files together") try: out_file = open(output, "wb") except (IOError, OSError) as e: logger.error("Failed to open output file: %s" % e) sys.exit(1) with out_file: if header is not None: out_file.write(str.encode(header + "\n")) for file_name in file_list: if file_name is not None: with open(file_name, "rb") as f: copyfileobj(f, out_file)
[docs] def cleanup(file_list): """remove temporary files Args: file_list (list): a list of files to be removed """ logger = logging.getLogger(__name__) logger.info("Cleaning up") for temp_file in file_list: if temp_file is not None: try: os.remove(temp_file) except (IOError, OSError): logger.error("Could not read temporary file: %s" % temp_file) logger.error("You may have to remove temporary files manually") sys.exit(1)
[docs] def compress(filename, remove=True): """gzip a file Args: filename (string): name of file to be compressed """ logger = logging.getLogger(__name__) logger.info("Compressing %s" % filename) outfile = filename + ".gz" with open(filename, "rb") as i, gzip.open(outfile, "wb") as o: copyfileobj(i, o) if remove: cleanup([filename]) return outfile
[docs] def dump(object, output): """dump an object, like pickle.dump. This function uses pickle.dumps to dump large objects Args: object (object): a python object """ MAX_BYTES = 2**31 - 1 pickled_object = pickle.dumps(object, protocol=pickle.HIGHEST_PROTOCOL) size = sys.getsizeof(pickled_object) with open(output, "wb") as out_file: for i in range(0, size, MAX_BYTES): out_file.write(pickled_object[i : i + MAX_BYTES])
[docs] def load(filename): """load a pickle from disk This function uses pickle.loads to load large objects Args: filename (string): the path of the pickle to load """ MAX_BYTES = 2**31 - 1 size = os.path.getsize(filename) bytes = bytearray(0) with open(filename, "rb") as f: for _ in range(0, size, MAX_BYTES): bytes += f.read(MAX_BYTES) object = pickle.loads(bytes) return object