#!/usr/bin/env python
# -*- coding: utf-8 -*-
import gzip
import logging
import os
import pickle
import random
import sys
from shutil import copyfileobj
import numpy as np
from Bio import SeqIO
[docs]
def phred_to_prob(q):
"""Convert a phred score (Sanger or modern Illumina) in probabilty
Given a phred score q, return the probabilty p
of the call being right
Args:
q (int): phred score
Returns:
float: probabilty of basecall being right
"""
p = 10 ** (-q / 10)
return 1 - p
[docs]
def prob_to_phred(p):
"""Convert a probabilty into a phred score (Sanger or modern Illumina)
Given a probabilty p of the basecall being right, return the
phred score q
Args:
p (int): probabilty of basecall being right
Returns:
int: phred score
"""
q = int(round(-10 * np.log10(1 - p)))
return q
[docs]
def rev_comp(s):
"""A simple reverse complement implementation working on strings
Args:
s (string): a DNA sequence (IUPAC, can be ambiguous)
Returns:
list: reverse complement of the input sequence
"""
bases = {
"a": "t",
"c": "g",
"g": "c",
"t": "a",
"y": "r",
"r": "y",
"w": "w",
"s": "s",
"k": "m",
"m": "k",
"n": "n",
"b": "v",
"v": "b",
"d": "h",
"h": "d",
"A": "T",
"C": "G",
"G": "C",
"T": "A",
"Y": "R",
"R": "Y",
"W": "W",
"S": "S",
"K": "M",
"M": "K",
"N": "N",
"B": "V",
"V": "B",
"D": "H",
"H": "D",
}
sequence = list(s)
complement = "".join([bases[b] for b in sequence])
reverse_complement = complement[::-1]
return reverse_complement
[docs]
def count_records(fasta_file):
"""Count the number of records in a fasta file and return a list of
recods id
Args:
fasta_file (string): the path to a fasta file
Returns:
list: a list of record ids
"""
logger = logging.getLogger(__name__)
record_list = []
for record in SeqIO.parse(fasta_file, "fasta"):
record_list.append(record.id)
try:
assert len(record_list) != 0
except AssertionError:
logger.error("Failed to find records in genome(s) file:%s" % fasta_file)
sys.exit(1)
else:
return record_list
[docs]
def split_list(lst, n_parts=1):
"""Split a list in a number of parts
Args:
l (list): a list
n_parts (in): the number of parts to split the list in
Returns:
list: a list of n_parts lists
"""
length = len(lst)
return [lst[i * length // n_parts : (i + 1) * length // n_parts] for i in range(n_parts)]
[docs]
def nplog(type, flag):
logger = logging.getLogger(__name__)
logger.debug("FloatingPointError (%s), with flag %s" % (type, flag))
[docs]
def convert_n_reads(unit):
"""For strings representing a number of bases and ending with k, K, m, M,
g, and G converts to a plain old number
Args:
n (str): a string representing a number ending with a suffix
Returns:
float: a number of reads
"""
logger = logging.getLogger(__name__)
suffixes = {"k": 3, "m": 6, "g": 9}
if unit[-1].isdigit():
try:
unit_int = int(unit)
except ValueError:
logger.error("%s is not a valid number of reads" % unit)
sys.exit(1)
elif unit[-1].lower() in suffixes:
number = unit[:-1]
exponent = suffixes[unit[-1].lower()]
unit_int = int(float(number) * 10**exponent)
else:
logger.error("%s is not a valid number of reads" % unit)
sys.exit(1)
return unit_int
[docs]
def genome_file_exists(filename):
"""Checks if the output file from the --ncbi option already exists
Args:
filename (str): a file name
"""
logger = logging.getLogger(__name__)
try:
assert os.path.exists(filename) is False
except AssertionError:
logger.error("%s already exists. Aborting." % filename)
logger.error("Maybe use another --output prefix")
sys.exit(1)
[docs]
def reservoir(records, record_list, n=None):
"""yield a number of records from a fasta file using reservoir sampling
Args:
records (obj): fasta records from SeqIO.parse
Yields:
record (obj): a fasta record
"""
logger = logging.getLogger(__name__)
if n is not None:
try:
total = len(record_list)
assert n < total
except AssertionError:
logger.error("-u should be strictly smaller than total number of records.")
sys.exit(1)
else:
random.seed()
x = 0
samples = sorted(random.sample(range(0, total - 1), n))
for sample in samples:
while x < sample:
x += 1
_ = records.__next__()
record = records.__next__()
x += 1
yield record
else:
for record in records:
yield record
[docs]
def concatenate(file_list, output, header=None):
"""Concatenate files together
Args:
file_list (list): the list of input files (can be a generator)
output (string): the output file name
"""
logger = logging.getLogger(__name__)
logger.info("Stitching input files together")
try:
out_file = open(output, "wb")
except (IOError, OSError) as e:
logger.error("Failed to open output file: %s" % e)
sys.exit(1)
with out_file:
if header is not None:
out_file.write(str.encode(header + "\n"))
for file_name in file_list:
if file_name is not None:
with open(file_name, "rb") as f:
copyfileobj(f, out_file)
[docs]
def cleanup(file_list):
"""remove temporary files
Args:
file_list (list): a list of files to be removed
"""
logger = logging.getLogger(__name__)
logger.info("Cleaning up")
for temp_file in file_list:
if temp_file is not None:
try:
os.remove(temp_file)
except (IOError, OSError):
logger.error("Could not read temporary file: %s" % temp_file)
logger.error("You may have to remove temporary files manually")
sys.exit(1)
[docs]
def compress(filename, remove=True):
"""gzip a file
Args:
filename (string): name of file to be compressed
"""
logger = logging.getLogger(__name__)
logger.info("Compressing %s" % filename)
outfile = filename + ".gz"
with open(filename, "rb") as i, gzip.open(outfile, "wb") as o:
copyfileobj(i, o)
if remove:
cleanup([filename])
return outfile
[docs]
def dump(object, output):
"""dump an object, like pickle.dump.
This function uses pickle.dumps to dump large objects
Args:
object (object): a python object
"""
MAX_BYTES = 2**31 - 1
pickled_object = pickle.dumps(object, protocol=pickle.HIGHEST_PROTOCOL)
size = sys.getsizeof(pickled_object)
with open(output, "wb") as out_file:
for i in range(0, size, MAX_BYTES):
out_file.write(pickled_object[i : i + MAX_BYTES])
[docs]
def load(filename):
"""load a pickle from disk
This function uses pickle.loads to load large objects
Args:
filename (string): the path of the pickle to load
"""
MAX_BYTES = 2**31 - 1
size = os.path.getsize(filename)
bytes = bytearray(0)
with open(filename, "rb") as f:
for _ in range(0, size, MAX_BYTES):
bytes += f.read(MAX_BYTES)
object = pickle.loads(bytes)
return object