Source code for iss.generator

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division, unicode_literals
from builtins import range

from iss.util import rev_comp
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.Alphabet import IUPAC
from Bio.SeqUtils import GC
from shutil import copyfileobj

import os
import sys
import random
import logging
import numpy as np


[docs]def reads(record, ErrorModel, n_pairs, cpu_number, gc_bias=False): """Simulate reads from one genome (or sequence) according to an ErrorModel Each read is a SeqRecord object Return a generator of tuples containing the forward and reverse read. Args: record (SeqRecord): sequence or genome of reference coverage (float): desired coverage of the genome ErrorModel (ErrorModel): an ErrorModel class gc_bias (bool): if set, the function may skip a read due to abnormal GC content Yields: tuple: tuple containg a forward read and a reverse read """ logger = logging.getLogger(__name__) logger.debug( 'Cpu #%s: Generating %s read pairs' % (cpu_number, n_pairs)) read_tuple_list = [] for i in range(n_pairs): try: forward, reverse = simulate_read(record, ErrorModel, i) except ValueError as e: logger.error('Skipping this record: %s' % record.id) return # forward, reverse = simulate_read(record, ErrorModel, i) if gc_bias: stiched_seq = forward.seq + reverse.seq gc_content = GC(stiched_seq) if 40 < gc_content < 60: read_tuple_list.append((forward, reverse)) elif np.random.rand() < 0.90: read_tuple_list.append((forward, reverse)) else: continue else: read_tuple_list.append((forward, reverse)) temp_file_name = '.iss.tmp.%s.%s' % (record.id, cpu_number) to_fastq(read_tuple_list, temp_file_name) return temp_file_name
[docs]def simulate_read(record, ErrorModel, i): """From a sequence record and an ErrorModel, generate a read pair EXPERIMENTAL. SHOULD BE MULTI-THREADABLE """ logger = logging.getLogger(__name__) sequence = record.seq header = record.id read_length = ErrorModel.read_length insert_size = ErrorModel.random_insert_size() # generate the forward read try: # a ref sequence has to be longer than 2 * read_length + i_size forward_start = random.randrange( 0, len(record.seq) - (2 * read_length + insert_size)) except ValueError as e: logger.error( '%s shorter than template length for this ErrorModel:%s' % (record.id, e)) raise forward_end = forward_start + read_length bounds = (forward_start, forward_end) # create a perfect read forward = SeqRecord( Seq(str(sequence[forward_start:forward_end]), IUPAC.unambiguous_dna ), id='%s_%s_1' % (header, i), description='' ) # add the indels, the qual scores and modify the record accordingly forward.seq = ErrorModel.introduce_indels( forward, 'forward', sequence, bounds) forward = ErrorModel.introduce_error_scores(forward, 'forward') forward.seq = ErrorModel.mut_sequence(forward, 'forward') # generate the reverse read reverse_start = forward_end + insert_size reverse_end = reverse_start + read_length bounds = (reverse_start, reverse_end) # create a perfect read reverse = SeqRecord( Seq(rev_comp(str(sequence[reverse_start:reverse_end])), IUPAC.unambiguous_dna ), id='%s_%s_2' % (header, i), description='' ) # add the indels, the qual scores and modify the record accordingly reverse.seq = ErrorModel.introduce_indels( reverse, 'reverse', sequence, bounds) reverse = ErrorModel.introduce_error_scores(reverse, 'reverse') reverse.seq = ErrorModel.mut_sequence(reverse, 'reverse') return (forward, reverse)
[docs]def to_fastq(generator, output): """Write reads to fastq Take the read generator and write read pairs in two fastq files: output_R1.fastq and output_R2.fastq Args: generator (generator): the read generator output (string): the output files prefix """ logger = logging.getLogger(__name__) # define name of output files output_forward = output + '_R1.fastq' output_reverse = output + '_R2.fastq' try: f = open(output_forward, 'a') r = open(output_reverse, 'a') except PermissionError as e: logger.error('Failed to open output file(s): %s' % e) sys.exit(1) else: with f, r: for read_tuple in generator: SeqIO.write(read_tuple[0], f, 'fastq-sanger') SeqIO.write(read_tuple[1], r, 'fastq-sanger')
[docs]def concatenate(file_list, output): """Concatenate fastq files together Outputs two files: output_R1.fastq and output_R2.fastq Args: file_list (list): the list of input files prefix output (string): the output files prefix """ logger = logging.getLogger(__name__) logger.info('Stitching temporary files together') # define name of output files output_forward = output + '_R1.fastq' output_reverse = output + '_R2.fastq' try: out_f = open(output_forward, 'wb') out_r = open(output_reverse, 'wb') except PermissionError as e: logger.error('Failed to open output file(s): %s' % e) sys.exit(1) with out_f, out_r: for file_name in file_list: if file_name is not None: temp_f = file_name + '_R1.fastq' temp_r = file_name + '_R2.fastq' with open(temp_f, 'rb') as f, open(temp_r, 'rb') as r: copyfileobj(f, out_f) copyfileobj(r, out_r)
[docs]def cleanup(file_list): """remove temporary files Args: file_list (list): a list of files to be removed """ logger = logging.getLogger(__name__) logger.info('Cleaning up') for temp_file in file_list: if temp_file is not None: try: os.remove(temp_file + '_R1.fastq') os.remove(temp_file + '_R2.fastq') except FileNotFoundError as e: logger.error('Temporary file not found: %s' % temp_file) logger.error('You may have to remove temporary files manually') sys.exit(1)