Source code for plastid.bin.phase_by_size

#!/usr/bin/env python
"""Estimate :term:`sub-codon phasing` in a :term:`ribosome profiling` dataset,
stratified by read length.

Because ribosomes step three nucleotides in each cycle of translation elongation,
in many :term:`ribosome profiling` datasets a triplet periodicity is observable
in the distribution of :term:`ribosome-protected footprints <footprint>`.

In a good dataset, 70-90% of the reads on a codon fall within the first of the
three codon positions. This allows deduction of translation reading frames, if
the reading frame is not known *a priori.* See :cite:`Ingolia2009` for more
details.

Output files
------------
    OUTBASE_phasing.txt
        Read phasing for each read length

    OUTBASE_phasing.svg
        Plot of phasing by read length

where `OUTBASE` is supplied by the user.

 .. note::

    To avoid double-counting of codons, users should either use an *ROI file*
    made by the ``generate`` subprogram of the |metagene| script, or supply
    an :term:`annotation` file that includes only one transcript isoform per
    gene.

"""
import sys
import argparse
import inspect
import warnings

import pandas as pd
import numpy
import matplotlib
matplotlib.use("Agg")
from plastid.util.scriptlib.argparsers import (
    AlignmentParser,
    AnnotationParser,
    PlottingParser,
    BaseParser,
)
from plastid.util.io.openers import get_short_name, argsopener, read_pl_table
from plastid.util.io.filters import NameDateWriter
from plastid.util.scriptlib.help_formatters import format_module_docstring
from plastid.util.services.exceptions import DataWarning, ArgumentWarning
from plastid.plotting.plots import phase_plot
from plastid.genomics.roitools import SegmentChain

warnings.simplefilter("once")
printer = NameDateWriter(get_short_name(inspect.stack()[-1][1]))


[docs]def roi_row_to_cds(row): """Helper function to extract coding portions from maximal spanning windows flanking CDS starts that are created by |metagene| ``generate`` subprogram. Parameters ---------- row : (int, Series) Row from a :class:`pandas.DataFrame` of an ROI file made by the |metagene| ``generate`` subprogram Returns ------- |SegmentChain| Coding portion of maximal spanning window """ chainstr, alignment_offset, zero_point = row[1][["region", "alignment_offset", "zero_point"]] chain = SegmentChain.from_str(chainstr) cds_start = zero_point - alignment_offset subchain = chain.get_subchain(cds_start, chain.length) return subchain
[docs]def main(argv=sys.argv[1:]): """Command-line program Parameters ---------- argv : list, optional A list of command-line arguments, which will be processed as if the script were called from the command line if :py:func:`main` is called directly. Default: `sys.argv[1:]`. The command-line arguments, if the script is invoked from the command line """ al = AlignmentParser( disabled=["normalize", "big_genome", "spliced_bowtie_files"], input_choices=["BAM"], ) an = AnnotationParser() pp = PlottingParser() bp = BaseParser() plotting_parser = pp.get_parser() alignment_file_parser = al.get_parser(conflict_handler="resolve") annotation_file_parser = an.get_parser(conflict_handler="resolve") base_parser = bp.get_parser() parser = argparse.ArgumentParser( description=format_module_docstring(__doc__), formatter_class=argparse.RawDescriptionHelpFormatter, conflict_handler="resolve", parents=[base_parser, annotation_file_parser, alignment_file_parser, plotting_parser] ) parser.add_argument("roi_file",type=str,nargs="?",default=None, help="Optional. ROI file of maximal spanning windows surrounding start codons, "+\ "from ``metagene generate`` subprogram. Using this instead of `--annotation_files` "+\ "prevents double-counting of codons when multiple transcript isoforms exist "+\ "for a gene. See the documentation for `metagene` for more info about ROI files."+\ "If an ROI file is not given, supply an annotation with ``--annotation_files``") parser.add_argument("outbase", type=str, help="Required. Basename for output files") parser.add_argument( "--codon_buffer", type=int, default=5, help="Codons before and after start codon to ignore (Default: 5)" ) args = parser.parse_args(argv) bp.get_base_ops_from_args(args) pp.set_style_from_args(args) gnd = al.get_genome_array_from_args(args, printer=printer) read_lengths = list(range(args.min_length, args.max_length + 1)) codon_buffer = args.codon_buffer dtmp = { "read_length": numpy.array(read_lengths), "reads_counted": numpy.zeros_like(read_lengths, dtype=int), } if args.roi_file is not None: using_roi = True roi_table = read_pl_table(args.roi_file) regions = roi_table.iterrows() transform_fn = roi_row_to_cds back_buffer = -1 if len(args.annotation_files) > 0: warnings.warn( "If an ROI file is given, annotation files are ignored. Pulling regions from '%s'. Ignoring '%s'" % (args.roi_file, ", ".join(args.annotation_files)), ArgumentWarning ) else: using_roi = False if len(args.annotation_files) == 0: printer.write("Either an ROI file or at least annotation file must be given.") sys.exit(1) else: warnings.warn( "Using a transcript annotation file instead of an ROI file can lead to double-counting of codons if the annotation contains multiple transcripts per gene.", ArgumentWarning ) regions = an.get_transcripts_from_args(args, printer=printer) back_buffer = -codon_buffer transform_fn = lambda x: x.get_cds() phase_sums = {} for k in read_lengths: phase_sums[k] = numpy.zeros(3) for n, roi in enumerate(regions): if n % 1000 == 1: printer.write("Counted %s ROIs ..." % n) # transformation needed to extract CDS from transcript or from ROI file window cds_part = transform_fn(roi) # only calculate for coding genes if len(cds_part) > 0: read_dict = {} count_vectors = {} for k in read_lengths: read_dict[k] = [] count_vectors[k] = [] # for each seg, fetch reads, sort them, and create individual count vectors for seg in cds_part: reads = gnd.get_reads(seg) for read in filter(lambda x: len(x.positions) in read_dict, reads): read_dict[len(read.positions)].append(read) # map and sort by length for read_length in read_dict: count_vector = list(gnd.map_fn(read_dict[read_length], seg)[1]) count_vectors[read_length].extend(count_vector) # add each count vector for each length to total for k, vec in count_vectors.items(): counts = numpy.array(vec) if cds_part.strand == "-": counts = counts[::-1] if len(counts) % 3 == 0: counts = counts.reshape((int(len(counts) / 3), 3)) else: if using_roi == False: message = "Length of '%s' coding region (%s nt) is not divisible by 3. Ignoring last partial codon." % ( roi.get_name(), len(counts) ) warnings.warn(message, DataWarning) newlen = int(len(counts) // 3) counts = counts[:3 * newlen] counts = counts.reshape(newlen, 3) phase_sums[k] += counts[codon_buffer:back_buffer, :].sum(0) printer.write("Counted %s ROIs total." % (n + 1)) for k in dtmp: dtmp[k] = numpy.array(dtmp[k]) # total reads counted for each size for k in read_lengths: dtmp["reads_counted"][dtmp["read_length"] == k] = phase_sums[k].sum() # read length distribution dtmp["fraction_reads_counted" ] = dtmp["reads_counted"].astype(float) / dtmp["reads_counted"].sum() # phase vectors phase_vectors = {K: V.astype(float) / V.astype(float).sum() for K, V in phase_sums.items()} for i in range(3): dtmp["phase%s" % i] = numpy.zeros(len(dtmp["read_length"])) for k, vec in phase_vectors.items(): for i in range(3): dtmp["phase%s" % i][dtmp["read_length"] == k] = vec[i] # phase table fn = "%s_phasing.txt" % args.outbase printer.write("Saving phasing table to %s ..." % fn) dtmp = pd.DataFrame(dtmp) with argsopener(fn, args) as fh: dtmp.to_csv( fh, columns=[ "read_length", "reads_counted", "fraction_reads_counted", "phase0", "phase1", "phase2", ], float_format="%.6f", na_rep="nan", sep="\t", index=False, header=True ) fh.close() fig = {} if args.figsize is not None: fig["figsize"] = tuple(args.figsize) colors = pp.get_colors_from_args(args, len(read_lengths)) fn = "%s_phasing.%s" % (args.outbase, args.figformat) printer.write("Plotting to %s ..." % fn) plot_counts = numpy.vstack([V for (_, V) in sorted(phase_sums.items())]) fig, (ax1,_) = phase_plot(plot_counts,labels=read_lengths,lighten_by=0.3, cmap=None,color=colors,fig=fig) if args.title is not None: ax1.set_title(args.title) else: ax1.set_title("Phasing stats for %s" % args.outbase) fig.savefig(fn, dpi=args.dpi, bbox_inches="tight")
if __name__ == "__main__": main()