Source code for iss.abundance
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division
from scipy import stats
import os
import sys
import logging
import numpy as np
from Bio import SeqIO
[docs]def parse_abundance_file(abundance_file):
"""Parse an abundance or coverage file
The abundance/coverage file is a flat file of the format
"genome_id<TAB>abundance"
or
"genome_id<TAB>coverage"
Args:
abundance_file (string): the path to the abundance file
Returns:
dict: genome_id as keys, abundance as values
"""
logger = logging.getLogger(__name__)
abundance_dic = {}
try:
assert os.stat(abundance_file).st_size != 0
f = open(abundance_file, 'r')
except (IOError, OSError) as e:
logger.error('Failed to open abundance file:%s' % e)
sys.exit(1)
except AssertionError as e:
logger.error('Abundance file seems empty: %s' % abundance_file)
sys.exit(1)
else:
with f:
for line in f:
try:
genome_id = line.split()[0]
abundance = float(line.split()[1])
except IndexError as e:
logger.error('Failed to read abundance file: %s' % e)
sys.exit(1)
else:
abundance_dic[genome_id] = abundance
logger.info('Loaded abundance file: %s' % abundance_file)
return abundance_dic
[docs]def halfnormal(record_list):
"""Generate scaled halfnormal abundance distribution from a number of
records
Args:
record_list (list): a list of record.id
Returns:
dict: a dictionary with records as keys, abundance as values
"""
abundance_dic = {}
n_records = len(record_list)
dist = stats.halfnorm.rvs(loc=0.00, scale=1.00, size=n_records)
dist_scaled = dist / sum(dist)
for record, abundance in zip(record_list, dist_scaled):
abundance_dic[record] = abundance
return abundance_dic
[docs]def exponential(record_list):
"""Generate scaled exponential abundance distribution from a number of
records
Args:
record_list (list): a list of record.id
Returns:
dict: a dictionary with records as keys, abundance as values
"""
abundance_dic = {}
n_records = len(record_list)
dist = np.random.exponential(size=n_records)
dist_scaled = dist / sum(dist)
for record, abundance in zip(record_list, dist_scaled):
abundance_dic[record] = abundance
return abundance_dic
[docs]def lognormal(record_list):
"""Generate scaled lognormal abundance distribution from a number of
records
Args:
record_list (list): a list of record.id
Returns:
dict: a dictionary with records as keys, abundance as values
"""
abundance_dic = {}
n_records = len(record_list)
dist = np.random.lognormal(size=n_records)
dist_scaled = dist / sum(dist)
for record, abundance in zip(record_list, dist_scaled):
abundance_dic[record] = abundance
return abundance_dic
[docs]def zero_inflated_lognormal(record_list):
"""Generate scaled zero-inflated lognormal abundance distribution from a
number of records
Args:
record_list (list): a list of record.id
Returns:
dict: a dictionary with records as keys, abundance as values
"""
abundance_dic = {}
n_records = len(record_list)
zero_inflated = stats.bernoulli.rvs(p=0.2, size=n_records)
dist = (1 - zero_inflated) * np.random.lognormal(size=n_records)
dist_scaled = dist / sum(dist)
for record, abundance in zip(record_list, dist_scaled):
abundance_dic[record] = abundance
return abundance_dic
[docs]def to_coverage(total_n_reads, species_abundance, read_length, genome_size):
"""Calculate the coverage of a genome in a metagenome given its size and
abundance
Args:
total_n_reads (int): total amount of reads in the dataset
species_abundance (float): abundance of the species, between 0 and 1
read_length (int): length of the reads in the dataset
genome_size (int): size of the genome
Returns:
float: coverage of the genome
"""
n_reads = total_n_reads * species_abundance
coverage = (n_reads * read_length) / genome_size
return coverage
[docs]def to_file(abundance_dic, output):
"""Write the abundance dictionary to a file
Args:
abundance_dic (dict): the abundance dictionary
output (str): the output file name
"""
logger = logging.getLogger(__name__)
output_abundance = output + '_abundance.txt'
try:
f = open(output_abundance, 'w')
except PermissionError as e:
logger.error('Failed to open output file: %s' % e)
sys.exit(1)
else:
with f:
for record, abundance in abundance_dic.items():
f.write('%s\t%s\n' % (record, abundance))
[docs]def draft(genomes, draft, distribution, output):
"""Computes the abundance dictionary for a mix of complete and
draft genomes
Args:
genomes (list): list of all input records
draft (list): draft genome files
distribution (function): distribution function to use
output (str): output file
Returns:
dict: the abundance dictionary
"""
# first we get a list of contig names in draft genomes
draft_records = []
for d in draft:
draft_records.extend(
[record.name for record in SeqIO.parse(d, 'fasta')])
genomes = list(set(genomes) - set(draft_records))
abundance_dic = distribution(genomes + draft)
complete_genomes_abundance = {k: v for
k, v in abundance_dic.items()
if k not in draft}
to_file(abundance_dic, output)
for key, abundance in abundance_dic.items():
draft_dic = {}
if key in draft:
try:
f = open(key, 'r')
with f:
fasta_file = SeqIO.parse(f, 'fasta')
draft_records = [record for record in fasta_file]
total_length = sum(len(record) for record in draft_records)
except Exception as e:
raise
else:
total_length = sum(len(record) for record in draft_records)
for record in draft_records:
length = len(record)
contig_abundance = abundance * (length / total_length)
# print(key, record.id, contig_abundance)
draft_dic[record.id] = contig_abundance
# python 3.5 +
# full_abundance_dic = {**complete_genomes_abundance, **draft_dic}
full_abundance_dic = complete_genomes_abundance.copy()
full_abundance_dic.update(draft_dic)
return full_abundance_dic