Source code for iss.generator

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import gc
import logging
import os
import random
import sys

import numpy as np
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqUtils import gc_fraction

from iss import abundance, download, util
from iss.error_models import basic, kde, perfect
from iss.util import load, rev_comp


[docs] def simulate_reads( record, error_model, n_pairs, cpu_number, forward_handle, reverse_handle, mutations_handle, sequence_type, gc_bias=False, mode="default", ): """Simulate reads from one genome (or sequence) according to an ErrorModel This function makes use of the `simulate_read` function to simulate reads and save them in a fastq file Args: record (SeqRecord): sequence or genome of reference error_model (ErrorModel): an ErrorModel n_pairs (int): the number of reads to generate cpu_number (int): an int indentifying the cpu that is used by the function. Is used for naming the output file forward_handle (file): a file handle to write the forward reads to reverse_handle (file): a file handle to write the reverse reads to mutations_handle (file): a file handle to write the mutations to sequencing_type (str): metagenomics or amplicon sequencing used gc_bias (bool): if set, the function may skip a read due to abnormal GC content """ logger = logging.getLogger(__name__) # load the record from disk if mode is memmap if mode == "memmap": record_mmap = load(record) record = record_mmap logger.debug("Cpu #%s: Generating %s read pairs" % (cpu_number, n_pairs)) for forward_record, reverse_record, mutations in reads_generator( n_pairs, record, error_model, cpu_number, gc_bias, sequence_type ): SeqIO.write(forward_record, forward_handle, "fastq-sanger") SeqIO.write(reverse_record, reverse_handle, "fastq-sanger") write_mutations(mutations, mutations_handle)
[docs] def reads_generator(n_pairs, record, error_model, cpu_number, gc_bias, sequence_type): logger = logging.getLogger(__name__) i = 0 while i < n_pairs: try: forward, reverse, mutations = simulate_read(record, error_model, i, cpu_number, sequence_type) except AssertionError: logger.warning("%s shorter than read length for this ErrorModel" % record.id) logger.warning("Skipping %s. You will have less reads than specified" % record.id) break else: if gc_bias: stiched_seq = forward.seq + reverse.seq gc_content = gc_fraction(stiched_seq) if 40 < gc_content < 60: yield (forward, reverse, mutations) i += 1 elif np.random.rand() < 0.90: yield (forward, reverse, mutations) i += 1 else: continue else: yield (forward, reverse, mutations) i += 1
[docs] def simulate_read(record, error_model, i, cpu_number, sequence_type): """From a read pair from one genome (or sequence) according to an ErrorModel Each read is a SeqRecord object returns a tuple containing the forward and reverse read. Args: record (SeqRecord): sequence or genome of reference error_model (ErrorModel): an ErrorModel class i (int): a number identifying the read cpu_number (int): cpu number. Is added to the read id. sequence_type (str): metagenomics or amplicon sequencing used Returns: tuple: tuple containg a forward read and a reverse read """ logger = logging.getLogger(__name__) sequence = record.seq header = record.id read_length = error_model.read_length if error_model.fragment_length is not None and error_model.fragment_sd is not None: fragment_length = int(np.random.normal(error_model.fragment_length, error_model.fragment_sd)) insert_size = fragment_length - (read_length * 2) else: insert_size = error_model.random_insert_size() fragment_length = insert_size + (read_length * 2) # generate the forward read try: # a ref sequence has to be longer than 2 * read_length + i_size assert read_length < len(record.seq) # assign the start position of the forward read # if sequence_type == metagenomics, get a random start position # if sequence_type == amplicon, start position is the start of the read if sequence_type == "metagenomics": forward_start = random.randrange(0, len(record.seq) - fragment_length) elif sequence_type == "amplicon": forward_start = 0 else: raise RuntimeError(f"sequence type '{sequence_type}' is not supported") except AssertionError: raise except ValueError as e: logger.debug("%s shorter than template length for this ErrorModel:%s" % (record.id, e)) forward_start = max(0, random.randrange(0, len(record.seq) - read_length)) forward_end = forward_start + read_length bounds = (forward_start, forward_end) # create a perfect read forward = SeqRecord( Seq(str(sequence[forward_start:forward_end])), id="%s_%s_%s/1" % (header, i, cpu_number), description="" ) forward.annotations["mutations"] = [] forward.annotations["original"] = str(forward.seq) # add the indels, the qual scores and modify the record accordingly forward = error_model.introduce_indels(forward, "forward", sequence, bounds) forward = error_model.introduce_error_scores(forward, "forward") forward = error_model.mut_sequence(forward, "forward") # generate the reverse read # assign start position reverse read # if sequence_type == metagenomics, get a start position based on insert_size # if sequence_type == amplicon, start position is the end of the read if sequence_type == "metagenomics": reverse_start = forward_end + insert_size reverse_end = reverse_start + read_length elif sequence_type == "amplicon": reverse_start = len(record.seq) - read_length reverse_end = reverse_start + read_length else: raise ValueError(f"Sequence type {sequence_type} not known") if reverse_end > len(record.seq): # we use random insert when the modelled template length distribution # is too large reverse_end = random.randrange(read_length, len(record.seq)) reverse_start = reverse_end - read_length bounds = (reverse_start, reverse_end) # create a perfect read reverse = SeqRecord( Seq(rev_comp(str(sequence[reverse_start:reverse_end]))), id="%s_%s_%s/2" % (header, i, cpu_number), description="", ) reverse.annotations["mutations"] = [] reverse.annotations["original"] = str(reverse.seq) # add the indels, the qual scores and modify the record accordingly reverse = error_model.introduce_indels(reverse, "reverse", sequence, bounds) reverse = error_model.introduce_error_scores(reverse, "reverse") reverse = error_model.mut_sequence(reverse, "reverse") return (forward, reverse, forward.annotations["mutations"] + reverse.annotations["mutations"])
[docs] def to_fastq(generator, output): """Write reads to a fastq file Take a generator or a list containing read pairs (tuples) and write them in two fastq files: output_R1.fastq and output_R2.fastq Args: generator (generator): a read generator (or list) output (string): the output files prefix """ logger = logging.getLogger(__name__) # define name of output files output_forward = output + "_R1.fastq" output_reverse = output + "_R2.fastq" try: f = open(output_forward, "a") r = open(output_reverse, "a") except PermissionError as e: logger.error("Failed to open output file(s): %s" % e) sys.exit(1) else: with f, r: for read_tuple in generator: SeqIO.write(read_tuple[0], f, "fastq-sanger") SeqIO.write(read_tuple[1], r, "fastq-sanger")
[docs] def worker_iterator(work, error_model, cpu_number, worker_prefix, seed, sequence_type, gc_bias): """A utility function to run the reads simulation of each record in a loop for a specific cpu""" logger = logging.getLogger(__name__) try: forward_handle = open(f"{worker_prefix}_R1.fastq", "w") reverse_handle = open(f"{worker_prefix}_R2.fastq", "w") mutation_handle = open(f"{worker_prefix}.vcf", "w") except PermissionError as e: logger.error("Failed to write temporary output file(s): %s" % e) sys.exit(1) if seed is not None: random.seed(seed + cpu_number) np.random.seed(seed + cpu_number) with forward_handle, reverse_handle, mutation_handle: for record, n_pairs, mode in work: simulate_reads( record=record, n_pairs=n_pairs, mode=mode, error_model=error_model, cpu_number=cpu_number, forward_handle=forward_handle, reverse_handle=reverse_handle, mutations_handle=mutation_handle, sequence_type=sequence_type, gc_bias=gc_bias, )
[docs] def generate_work_divider( fasta_file, readcount_dic, abundance_dic, n_reads, coverage, coverage_file, error_model, output, chunk_size ): """Yields a list of tuples containing the records and the number of reads to generate for each record Yields: list[tuple[SeqIO.Record, int, str]]: a list of tuples containing the records, the number of reads to generate for each record and the mode """ logger = logging.getLogger(__name__) current_chunk = 0 total_reads_generated = 0 total_reads_generated_unrounded = 0 chunk_work = [] for record in fasta_file: # generate reads for records if readcount_dic is not None: if record.id not in readcount_dic: logger.warning(f"Record {record.id} not found in readcount file") continue n_pairs_unrounded = readcount_dic[record.id] / 2 elif abundance_dic is not None: if record.id not in abundance_dic: logger.warning(f"Record {record.id} not found in abundance file") continue record_abundance = abundance_dic[record.id] genome_size = len(record.seq) if coverage or coverage_file: record_coverage = record_abundance else: record_coverage = abundance.to_coverage( n_reads, record_abundance, error_model.read_length, genome_size, ) n_pairs_unrounded = ((record_coverage * len(record.seq)) / error_model.read_length) / 2 else: raise RuntimeError("No readcount or abundance file provided") # check that the rounding does not cause to drop read pairs n_pairs = round(n_pairs_unrounded) total_reads_generated_unrounded += n_pairs_unrounded total_reads_generated += n_pairs if round(total_reads_generated_unrounded) > total_reads_generated: logger.debug("Adding a pair to correct rounding error") n_pairs += 1 total_reads_generated += 1 logger.debug("Will generate %s read pairs for %s" % (n_pairs, record.id)) if n_pairs == 0: continue # due to a bug in multiprocessing # https://bugs.python.org/issue17560 # we can't send records taking more than 2**31 bytes # through serialisation. # In those cases we use memmapping if sys.getsizeof(str(record.seq)) >= 2**31 - 1: logger.warning("record %s unusually big." % record.id) logger.warning("Using a memory map.") mode = "memmap" record_mmap = "%s.memmap" % output if os.path.exists(record_mmap): os.unlink(record_mmap) util.dump(record, record_mmap) del record record = record_mmap gc.collect() else: mode = "default" n_pairs_remaining = n_pairs while n_pairs_remaining > 0: chunk_remaining = chunk_size - current_chunk if n_pairs_remaining <= chunk_remaining: # Record fits in the current chunk chunk_work.append((record, n_pairs_remaining, mode)) n_pairs_added = n_pairs_remaining else: # Record does not fit in the current chunk chunk_work.append((record, chunk_remaining, mode)) n_pairs_added = chunk_remaining n_pairs_remaining -= n_pairs_added current_chunk += n_pairs_added if current_chunk == chunk_size: yield chunk_work chunk_work = [] current_chunk = 0 if chunk_work: # Yield the last (not full) chunk yield chunk_work
[docs] def load_error_model(mode, seed, model, fragment_length, fragment_length_sd, store_mutations): """ Load the error model based on the specified mode and parameters. Args: mode (str): The mode of the error model. Possible values are 'kde', 'basic', and 'perfect'. seed (int): The random seed to use for generating random numbers. model (str): The model to use for the error model. Only applicable for the 'kde' mode. fragment_length (float): The mean fragment length for the error model. fragment_length_sd (float): The standard deviation of the fragment length for the error model. Returns: err_mod: The loaded error model based on the specified mode and parameters. """ logger = logging.getLogger(__name__) logger.info("Using %s ErrorModel" % mode) precomputed_error_models = { "hiseq": os.path.join(os.path.dirname(__file__), "profiles/HiSeq"), "novaseq": os.path.join(os.path.dirname(__file__), "profiles/NovaSeq"), "miseq": os.path.join(os.path.dirname(__file__), "profiles/miSeq_0.npz"), "miseq-20": os.path.join(os.path.dirname(__file__), "profiles/miSeq_20.npz"), "miseq-24": os.path.join(os.path.dirname(__file__), "profiles/miSeq_24.npz"), "miseq-28": os.path.join(os.path.dirname(__file__), "profiles/miSeq_28.npz"), "miseq-32": os.path.join(os.path.dirname(__file__), "profiles/miSeq_32.npz"), "miseq-36": os.path.join(os.path.dirname(__file__), "profiles/miSeq_36.npz"), "nextseq": os.path.join(os.path.dirname(__file__), "profiles/nextSeq.npz"), } if fragment_length is not None and fragment_length_sd is not None: logger.info( f"Using custom fragment length {fragment_length} and default fragment length sd {fragment_length_sd}" ) elif bool(fragment_length) ^ bool(fragment_length_sd): logger.error("fragment_length and fragment_length_sd must be specified together") sys.exit(1) if seed: logger.info("Setting random seed to %i" % seed) random.seed(seed) np.random.seed(seed) if mode == "kde": if model is None: logger.error("--model is required in --mode kde") sys.exit(1) elif model.lower() in precomputed_error_models: npz = precomputed_error_models[model.lower()] else: npz = model err_mod = kde.KDErrorModel(npz, fragment_length, fragment_length_sd, store_mutations) elif mode == "basic": if model is not None: logger.warning("--model %s will be ignored in --mode %s" % (model, mode)) err_mod = basic.BasicErrorModel(fragment_length, fragment_length_sd, store_mutations) elif mode == "perfect": if model is not None: logger.warning("--model %s will be ignored in --mode %s" % (model, mode)) err_mod = perfect.PerfectErrorModel(fragment_length, fragment_length_sd) return err_mod
[docs] def load_genomes(genomes, draft, ncbi, n_genomes_ncbi, output, n_genomes): """Load genomes from different sources and concatenate them into a single file. Args: genomes (list): List of paths to genome files. draft (list): List of paths to draft genome files. ncbi (bool): Flag indicating whether to download genomes from NCBI. n_genomes_ncbi (list): List of tuples specifying the NCBI genome IDs and the number of genomes to download. output (str): Path to the output file. n_genomes (int): Number of genomes to select randomly from the concatenated genome file. Returns: tuple: A tuple containing the number of genomes and the path to the concatenated genome file. """ logger = logging.getLogger(__name__) if not (genomes or draft or ncbi): logger.error("One of --genomes/-g, --draft, --ncbi/-k is required") sys.exit(1) genome_files = [] if genomes: genome_files.extend(genomes) if draft: genome_files.extend(draft) if ncbi and n_genomes_ncbi: util.genome_file_exists(output + "_ncbi_genomes.fasta") if len(*ncbi) != len(*n_genomes_ncbi): logger.error( "--ncbi and --n_genomes_ncbi of unequal lengths. \ Aborting" ) sys.exit(1) for g, n in zip(*ncbi, *n_genomes_ncbi): genomes_ncbi = download.ncbi(g, n, output + "_ncbi_genomes.fasta") genome_files.append(genomes_ncbi) if ncbi and not n_genomes_ncbi: logger.error("--ncbi/-k requires --n_genomes_ncbi/-U. Aborting.") sys.exit(1) genome_file = output + ".iss.tmp.genomes.fasta" util.concatenate(genome_files, output=genome_file) # for n_genomes we use reservoir sampling to draw random genomes # from the concatenated genome file. We then override the file. if n_genomes and not draft and not ncbi: genome_count = util.count_records(genome_file) genome_files = [genome for genome in util.reservoir(SeqIO.parse(genome_file, "fasta"), genome_count, n_genomes)] SeqIO.write(genome_files, genome_file, "fasta") if os.stat(genome_file).st_size == 0: logger.error("Genome(s) file seems empty: %s" % genome_file) sys.exit(1) try: f = open(genome_file, "r") with f: # count the number of records genome_list = util.count_records(f) except IOError as e: logger.error("Failed to open genome(s) file:%s" % e) sys.exit(1) return genome_list, genome_file
[docs] def load_readcount_or_abundance( readcount_file, abundance_file, coverage_file, coverage, abundance_distribution, draft, genome_list, genome_file, n_reads, output, error_model, ): """Load readcount or abundance information based on the provided input parameters. Args: readcount_file (str): Path to the readcount file. abundance_file (str): Path to the abundance file. coverage_file (str): Path to the coverage file. coverage (str): Coverage distribution type. abundance_distribution (str): Abundance distribution type. draft (str): Draft mode. genome_list (list): List of genomes. genome_file (str): Path to the genome file. n_reads (int): Number of reads. output (str): Output file path. error_model (object): Error model object. Returns: tuple: A tuple containing the readcount dictionary and abundance dictionary. """ logger = logging.getLogger(__name__) abundance_dispatch = { "uniform": abundance.uniform, "halfnormal": abundance.halfnormal, "exponential": abundance.exponential, "lognormal": abundance.lognormal, "zero_inflated_lognormal": abundance.zero_inflated_lognormal, } readcount_dic = None abundance_dic = None if readcount_file: logger.info("Using readcount file:%s" % readcount_file) logger.warning("--readcount_file disables --n_reads, n_reads will be calculated from the readcount file") if draft: raise RuntimeError("readcount_file is only supported using --genomes, not --draft") readcount_dic = abundance.parse_readcount_file(readcount_file) elif abundance_file: logger.info("Using abundance file:%s" % abundance_file) if draft: abundance_dic_short = abundance.parse_abundance_file(abundance_file) complete_genomes_dic = {k: v for k, v in abundance_dic_short.items() if k not in draft} draft_dic = abundance.expand_draft_abundance(abundance_dic_short, draft) abundance_dic = {**complete_genomes_dic, **draft_dic} else: abundance_dic = abundance.parse_abundance_file(abundance_file) elif coverage_file: logger.warning("--coverage_file is an experimental feature") logger.warning("--coverage_file disables --n_reads") logger.info("Using coverage file:%s" % coverage_file) if draft: coverage_dic = abundance.parse_abundance_file(coverage_file) complete_genomes_dic = {k: v for k, v in coverage_dic.items() if k not in draft} draft_dic = abundance.expand_draft_abundance(coverage_dic, draft, mode="coverage") abundance_dic = {**complete_genomes_dic, **draft_dic} else: abundance_dic = abundance.parse_abundance_file(coverage_file) elif coverage in abundance_dispatch: # todo coverage distribution with --draft logger.info("Using %s coverage distribution" % coverage) if draft: abundance_dic = abundance.draft( genome_list, draft, abundance_dispatch[abundance_distribution], output, mode="coverage", ) else: abundance_dic = abundance_dispatch[coverage](genome_list) if n_reads: n_reads = util.convert_n_reads(n_reads) logger.info("scaling coverage to %s reads" % n_reads) abundance_dic = abundance.coverage_scaling(n_reads, abundance_dic, genome_file, error_model.read_length) abundance.to_file(abundance_dic, output, mode="coverage") elif abundance_distribution in abundance_dispatch: logger.info("Using %s abundance distribution" % abundance_distribution) if draft: abundance_dic = abundance.draft( genome_list, draft, abundance_dispatch[abundance_distribution], output, ) else: abundance_dic = abundance_dispatch[abundance_distribution](genome_list) abundance.to_file(abundance_dic, output) else: logger.error("Could not get abundance, or coverage or readcount information") sys.exit(1) return readcount_dic, abundance_dic
[docs] def write_mutations(mutations, mutations_handle): """Write mutations to a file Args: mutations (list): List of mutations. mutations_handle (file): File handle to write the mutations to. """ for vcf_dict in mutations: mutations_handle.write( "\t".join( [ str(vcf_dict["id"]), str(vcf_dict["position"] + 1), # vcf files have 1-based index ".", vcf_dict["ref"], str(vcf_dict["alt"]), str(vcf_dict["quality"]), "", "", ] ) + "\n" )