Source code for plastid.genomics.genome_hash

"""This module contains tools for lookup of features in a region of interest within the a genome.

.. contents::
   :local:

Summary
-------

It is frequently useful to retrieve features that overlap specific regions 
of interest in the genome. ``GenomeHashes`` index features by location,
providing quick lookup.


Module contents
---------------

Several implementations are provided, depending how the data are formatted:

====================    =========================================================
**Implementation**      **Format of feature data**
--------------------    ---------------------------------------------------------
|GenomeHash|            Objects in memory or in unindexed `BED`_, `GTF2`_,
                        `GFF3`_, or `PSL`_ files

|BigBedGenomeHash|      Annotations in `BigBed`_ files

|TabixGenomeHash|       Annotations in `tabix`_-compressed `BED`_, `GTF2`_,
                        `GFF3`_, or `PSL`_ files
====================    =========================================================


Examples
--------
Create a |GenomeHash|::

    >>> from plastid import *

    # from objects in memory
    >>> one_hash = GenomeHash(list_of_transcripts)
    
    # from a non-indexed file
    >>> my_hash = GenomeHash(list(GFF3_Reader("some_file.gff")))
    
    # from a BigBed file
    >>> bigbed_hash = BigBedGenomeHash("some_file.bb")
    
    # from tabix-compressed BED file
    >>> tabix_hash = TabixGenomeHash("some_file.bed.gz","BED")


To find features overlapping a region of interest, pass the feature coordinates
to a |GenomeHash| as a |GenomicSegment|, |SegmentChain|, or |Transcript|::

    >>> overlapping = my_hash[GenomicSegment("chrII",50,10000,"+")]
    >>> overlapping
    [ list of SegmentChains / Transcripts, et c ]
    
    # SegmentChains & Transcripts can also be keys:
    >>> tx = Transcript(GenomicSegment("chrII",50,300,"+"),GenomicSegment("chrII",9000,10000,"+"))
    >>> overlapping2 = my_hash[tx]
    >>> overlapping2
    [ list of SegmentChains / Transcripts, et c ]
    
    # find features that overlap `roi` on either strand
    >>> either_strand_overlap = my_hash.get_overlapping_features(roi,stranded=False)
    
"""

#===============================================================================
# Memory-efficient ways to hash features across a genome
#===============================================================================
import copy
from plastid.util.services.mini2to3 import cStringIO
from plastid.util.io.openers import NullWriter, multiopen
from plastid.readers.bed import BED_Reader
from plastid.readers.gff import GTF2_Reader, GFF3_Reader
from plastid.readers.psl import PSL_Reader
from plastid.genomics.roitools import GenomicSegment, SegmentChain
from abc import abstractmethod

DEFAULT_BIN_SIZE = 20000


class AbstractGenomeHash(object):
    """Abstract base class for objects that index |SegmentChains| by genomic position,
    allowing quick lookup for comparisons of overlap or other behavior
    """

    @abstractmethod
    def get_overlapping_features(self, roi, stranded=True):
        """Return list of features that overlap `roi`
        
        Parameters
        ----------
        feature : |GenomicSegment| or |SegmentChain|
            Query feature

        stranded : bool
            if `True`, retrieve only features on same strand as query feature.
            Otherwise, retrieve features on both strands (Default: `True`)
                             
                             
        Returns
        -------
        list
            List of overlapping features.


        Raises
        ------
        TypeError
            if `roi` is not a |GenomicSegment| or |SegmentChain|
        """
        pass

    @abstractmethod
    def __getitem__(self, roi):
        """Return list of features that overlap a region of interest (`roi`)
        on the same strand
        
        Parameters
        ----------
        roi : |GenomicSegment| or |SegmentChain|
            Query feature indicating region of interest

                             
        Returns
        -------
        list
            Overlapping features


        Raises
        ------
        TypeError
            if feature is not a |GenomicSegment| or |SegmentChain|
        """
        pass


[docs]class GenomeHash(AbstractGenomeHash): """Index memory-resident features (e.g. |SegmentChains| or |Transcripts|) by genomic position for quick lookup later. Parameters ---------- features : dict or list, optional dict or list of features, as |SegmentChain| objects or subclasses (Default: `[]`) binsize : int, optional Size in nucleotides of *neighborhood* for hash. (Default: %s) do_copy : bool If `True`, features will be copied before being stored in the hash. This comes at a speed cost, but will prevent unexpected side effects if the features are being changed outside the hash. If `False` (default), creation of the |GenomeHash| will be much faster. Notes ----- Because all features are stored in memory, for large genomes, a |TabixGenomeHash| or |BigBedGenomeHash| is much more memory-efficient. """ def __init__(self, features=None, binsize=DEFAULT_BIN_SIZE, do_copy=False): """Create a |GenomeHash| Parameters ---------- features : dict or list, optional dict or list of features, as |SegmentChain| objects or subclasses (Default: `[]`) binsize : int, optional Size in nucleotides of *neighborhood* for hash. (Default: %s) do_copy : bool If `True`, features will be copied before being stored in the hash. This comes at a speed cost, but will prevent unexpected side effects if the features are being changed outside the hash. If `False` (default), creation of the |GenomeHash| will be much faster. """ % DEFAULT_BIN_SIZE self.copy = do_copy self.feature_dict = {} self._id_to_names = {} self.binsize = binsize features = [] if features is None else features self.update(features) def __repr__(self): return "<%s features=%s binsize=%s chrs=%s>" % ( self.__class__.__name__, len(self.feature_dict), self.binsize, self._feature_hash.keys() ) def __str__(self): return repr(self)
[docs] def update(self, features): """Add features to the |GenomeHash| Parameters ---------- features : dict or list dict or list of features, as |SegmentChain| objects or subclasses """ try: start = 1 + max(self._id_to_names.keys()) except ValueError: # if empty dictionary start = 0 if isinstance(features, dict): for n, (feature_name, feature) in enumerate(features.items()): feature_id = start + n self._id_to_names[feature_id] = feature_name if self.copy == True: self.feature_dict[feature_id] = copy.deepcopy(feature) else: self.feature_dict[feature_id] = feature else: for n, feature in enumerate(features): feature_id = start + n feature_name = feature.get_name() self._id_to_names[feature_id] = feature_name if self.copy == True: self.feature_dict[feature_id] = copy.deepcopy(feature) else: self.feature_dict[feature_id] = feature self._feature_hash = self._make_hash()
def _make_hash(self): """Create the |GenomeHash| Returns ------- dict Hierarchichal dictionary: `dict[chrom][strand] = list<str>` """ my_hash = {} for feature_id, feature in self.feature_dict.items(): bins = self._get_hash_bins(feature) chrom = feature.spanning_segment.chrom strand = feature.spanning_segment.strand if chrom not in my_hash.keys(): my_hash[chrom] = {} my_hash[chrom]["+"] = {} my_hash[chrom]["-"] = {} for b in bins: try: my_hash[chrom][strand][b].append(feature_id) except KeyError: my_hash[chrom][strand][b] = [feature_id] return my_hash def _get_hash_bins(self, roi): """Returns a list of genome bins that a given roi falls into Parameters ---------- feature : |GenomicSegment| or |SegmentChain| Query feature Returns ------- list<int> List of hash bins that `roi` covers Raises ------ TypeError if `roi` is not a |GenomicSegment| or |SegmentChain| """ if isinstance(roi, GenomicSegment): rois = [roi] elif isinstance(roi, SegmentChain): rois = roi else: raise TypeError("Query feature must be a GenomicSegment or SegmentChain") bins = [] for seg in rois: bin_start = seg.start // self.binsize bin_end = seg.end // self.binsize bins.extend(range(bin_start, bin_end + 1)) return list(set(bins)) def _get_nearby_feature_ids(self, roi, stranded=True): """Return unique IDs of |SegmentChain| s in all the bins occupied by `roi` Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature stranded : bool If `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list<str> Unique IDs of features near (within *self.binsize* distance from) `roi` Raises ------ TypeError if `roi` is not |GenomicSegment| or |SegmentChain| """ if isinstance(roi, GenomicSegment): iv = roi elif isinstance(roi, SegmentChain): iv = roi.spanning_segment else: raise TypeError("Query feature must be a GenomicSegment or SegmentChain") feature_ranges = self._get_hash_bins(roi) nearby_feature_ids = [] try: nearby_hash = self._feature_hash[iv.chrom][iv.strand] except KeyError: nearby_hash = {} for b in feature_ranges: try: nearby_feature_ids.extend(nearby_hash[b]) except KeyError: pass if stranded is False: other_strand = "+" if iv.strand == "-" else "-" if iv.strand == "+" else "." try: nearby_hash = self._feature_hash[iv.chrom][other_strand] except KeyError: nearby_hash = {} for b in feature_ranges: try: nearby_feature_ids.extend(nearby_hash[b]) except KeyError: pass nearby_feature_ids = set(nearby_feature_ids) return nearby_feature_ids
[docs] def get_nearby_features(self, roi, stranded=True): """Return list of features in all the bins occupied by `roi` Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature stranded : bool if `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list<str> Features near (within `self.binsize` distance from) `roi` Raises ------ TypeError if `roi` is not a |GenomicSegment| or |SegmentChain| """ nearby_feature_ids = self._get_nearby_feature_ids(roi, stranded=stranded) return [self.feature_dict[X] for X in nearby_feature_ids]
[docs] def get_nearby_feature_names(self, roi, stranded=True): """Return list of the names of features in all the bins occupied by `roi` Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature stranded : bool if `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list Names of features near (within `self.binsize` distance from) `roi` Raises ------ TypeError if `roi` is not a |GenomicSegment| or |SegmentChain| """ nearby_feature_ids = self._get_nearby_feature_ids(roi, stranded=stranded) return [self._id_to_names[X] for X in nearby_feature_ids]
[docs] def get_overlapping_features(self, roi, stranded=True): """Return list of features overlapping `roi`. Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature indicating region of interest stranded : bool if `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list Features overlapping `roi` Raises ------ TypeError if `roi` is not a |GenomicSegment| or |SegmentChain| """ nearby_features = self.get_nearby_features(roi, stranded=stranded) if isinstance(roi, GenomicSegment): roi = SegmentChain(roi) if stranded == False: fn = roi.unstranded_overlaps else: fn = roi.overlaps return [X for X in nearby_features if fn(X) == True]
def __getitem__(self, roi): """Return list of features that overlap a region of interest (roi), on same strand. Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature indicating region of interest stranded : bool if `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list Features overlapping `roi` Raises ------ TypeError if `roi` is not a |GenomicSegment| |SegmentChain| """ return self.get_overlapping_features(roi, stranded=True)
[docs]class BigBedGenomeHash(AbstractGenomeHash): """BigBedGenomeHash(*filenames,return_type=SegmentChain) Find features overlapping query regions in `BigBed`_ files. Parameters ---------- *filenames : str One or more filenames to open (NOT open filehandles) return_type : class implementing a :py:meth:`from_bed` method Class of object to return (Default: |SegmentChain|) Attributes ---------- bigbedreaders : |BigBedReader| |BigBedReaders| connecting to BigBed file(s) """ def __init__(self, *filenames, **kwargs): """Create a |BigBedGenomeHash| Parameters ---------- *filenames : str One or more filenames to open (NOT open filehandles) return_type : class implementing a :py:meth:`from_bed` method Class of object to return (Default: |SegmentChain|) """ from plastid.readers.bigbed import BigBedReader return_type = kwargs.get("return_type", SegmentChain) filenames = list(multiopen(filenames)) for filename in filenames: if not isinstance(filename, str): raise ValueError("`filename` must be a 'str'. Found a '%s'." % type(filename)) self.filenames = filenames self.bigbedreaders = [BigBedReader(X, return_type=return_type) for X in filenames]
[docs] def get_overlapping_features(self, roi, stranded=True): """Return list of features overlapping `roi` Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature indicating region of interest stranded : bool If `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list Features that overlap `roi` Raises ------ TypeError if `roi` is not a |GenomicSegment| or |SegmentChain| """ ltmp = [] for reader in self.bigbedreaders: ltmp.extend(reader.get(roi, stranded=stranded)) return ltmp
def __getitem__(self, roi, stranded=True): """Return list of features that overlap the region of interest (roi) Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature indicating region of interest stranded : bool If `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list Features that overlap `roi` Raises ------ TypeError if `roi` is not a |GenomicSegment| or |SegmentChain| """ return self.get_overlapping_features(roi, stranded=stranded)
[docs]class TabixGenomeHash(AbstractGenomeHash): """ TabixGenomeHash(*filenames,data_format='GTF2') Find features overlapping query regions in `Tabix`_-indexed files. Parameters ---------- filenames : str or list of str Filename or list of filenames of `Tabix`_-compressed files data_format : str Format of tabix-compressed file(s). Choices are: `'GTF2'`,`'GFF3'`,`'BED'`,`'PSL'` (Default: `GTF2`) Attributes ---------- filenames : str or list Name of file to open or list of filenames to open (NOT open filehandles) tabix_readers : list of :py:class:`pysam.Tabixfile` `Pysam`_ interfaces to underlying data files """ _READERS = { "GTF2": GTF2_Reader, "GFF3": GFF3_Reader, "BED": BED_Reader, "PSL": PSL_Reader, } def __init__(self, *filenames, **kwargs): """Create a |BigBedGenomeHash| Parameters ---------- filenames : str or list of str Filename or list of filenames of `Tabix`_-compressed files data_format : str Format of tabix-compressed file(s). Choices are: `'GTF2'`,`'GFF3'`,`'BED'`,`'PSL'` (Default: `GTF2`) """ from pysam import Tabixfile if len(filenames) == 1 and isinstance(filenames[0], list): filenames = filenames[0] self.filenames = list(multiopen(filenames)) self.printer = kwargs.get("printer", NullWriter()) data_format = kwargs.get("data_format", "GTF2") try: self._reader_class = TabixGenomeHash._READERS[data_format] except ValueError: msg = "Supported file formats for TabixGenomeHash are: %s" % ", ".join( sorted(TabixGenomeHash._READERS.keys()) ) self.printer.write(msg) raise ValueError(msg) self.tabix_readers = [Tabixfile(X) for X in self.filenames] def __del__(self): try: for reader in self.tabix_readers: try: reader.close() except: pass except: pass
[docs] def get_overlapping_features(self, roi, stranded=True): """Return list of features overlapping `roi` Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature indicating region of interest stranded : bool If `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list Features that overlap `roi` Raises ------ TypeError if `roi` is not a |GenomicSegment| or |SegmentChain| """ return list(self.__getitem__(roi, stranded=stranded))
def __getitem__(self, roi, stranded=True): """Return list of features that overlap the region of interest (roi). Parameters ---------- roi : |GenomicSegment| or |SegmentChain| Query feature indicating region of interest stranded : bool If `True`, retrieve only features on same strand as query feature. Otherwise, retrieve features on both strands Returns ------- list Features that overlap `roi` Raises ------ TypeError if `roi` is not a |GenomicSegment| or |SegmentChain| """ if isinstance(roi, GenomicSegment): roi_seg = roi roi_chain = SegmentChain(roi) elif isinstance(roi, SegmentChain): roi_chain = roi roi_seg = roi.spanning_segment else: raise TypeError("Query feature must be a GenomicSegment or SegmentChain") chrom = roi_seg.chrom feature_text = "\n".join(["\n".join(list(R.fetch(chrom, X.start, X.end))) \ for R in self.tabix_readers \ for X in roi_chain]) features = (self._reader_class(cStringIO.StringIO(feature_text))) if stranded == True: features = [X for X in features if roi_chain.overlaps(X)] else: features = [X for X in features if roi_chain.unstranded_overlaps(X)] return features