Source code for plastid.readers.wiggle

#!/usr/bin/env python
"""A single reader for fixedStep `wiggle`_, variableStep `wiggle`_, and `bedGraph`_ files.
|WiggleReader| is seldom called directly. Typically it is internally called by
|GenomeArray| and |SparseGenomeArray|, when their
:meth:`~plastid.genomics.genome_array.GenomeArray.add_from_wiggle` methods
are called.

See also
--------
`UCSC file format FAQ <http://genome.ucsc.edu/FAQ/FAQformat.html>`_
    UCSC Wiggle and bedGraph file specification

|GenomeArray| and |SparseGenomeArray|
    Array-like objects that store and index quantitative data over genomes
"""


[docs]class WiggleReader(object): """Read `wiggle`_ and `bedGraph`_ files line-by-line, returning tuples of `(chromosome, start position, stop position, value)`. Tuple coordinates are zero-indexed and half-open, regardless of whether the file is a `wiggle`_ or `bedGraph`_. See the `UCSC file format FAQ <http://genome.ucsc.edu/FAQ/FAQformat.html>`_ for details. Parameters ---------- fh : file-like Open filehandle pointing to `wiggle`_ or `bedGraph`_ data """ def __init__(self, fh): """ Parameters ---------- fh : file-like Open filehandle pointing to `wiggle`_ or `bedGraph`_ data """ self.fh = fh self.data_format = "bedGraph" self._reset() def _reset(self): """Reset positional counters""" self.chrom = None self.step = 1 self.span = 1 #default span is 1 (e.g. things span 1 base) self.counter = 1 #chromosomal positions are 1-indexed def __iter__(self): return self def _get_lineinfo(self, line): """Determine line type and return a dictionary containing key-value pairs of any parameters defined in the line. For data type declarations, the file format is returned under the key `'line_type'`. For track declaration lines, whatever key-value pairs are found are returned. Parameters ---------- line : str A line of `Wiggle`_ or `bedGraph`_ file Returns ------- dict `'line-type'` key describes the type of line If a header line, also contains values for the current `'stepsize'` and `'span'` """ dReturn = {} items = line.split() line_type = None if items[0] == "track": line_type = "fileHeader" elif items[0] == "variableStep": line_type = "dataHeader" elif items[0] == "fixedStep": line_type = "dataHeader" elif len(items) == 4: line_type = "bedGraph" else: line_type = "data" dReturn["line_type"] = line_type if "Header" in line_type: for item in items[1:]: if "description" in item: break elif "=" in item: key, val = item.split("=") dReturn[key] = val else: dReturn[key] = 'true' return dReturn def _next_line(self): return next(self.fh) def __next__(self): return self.next()
[docs] def next(self): """Yield a tuple of `(chromosome, start, stop, value)` for each data line. Header lines are processed internally and not exposed to the user. All coordinates are returned as 0-based, half-open intervals, following Python conventions. Returns ------- str chromosome name int start position, 0-indexed int end position, 0-indexed, half-open float value on chromosome between start and end """ while True: line = self._next_line() if line.isspace(): continue if line[0] == "#": continue line_info = self._get_lineinfo(line) line_type = line_info["line_type"] line_items = line.split() if line_type == "fileHeader": self.file_info = line_info continue elif line_type == "bedGraph": # bedGraph is 0-indexed, half open self._reset() self.data_format = "bedGraph" chrom = line_items[0] start = int(line_items[1]) stop = int(line_items[2]) val = float(line_items[3]) return (chrom, start, stop, val) elif line_type == "dataHeader": self._reset() self.data_format = line_items[0] if "chrom" in line_info: self.chrom = line_info["chrom"] if "span" in line_info: self.span = int(line_info["span"]) if "step" in line_info: self.step = int(line_info["step"]) if "start" in line_info: self.counter = int(line_info["start"]) continue elif line_type == "data": # variableStep and fixedStep are 1-indexed # variableStep is also fully-closed if self.data_format == "variableStep": start = int(line_items[0]) - 1 stop = start + self.span #leave alone. this will make half-open interval val = float(line_items[1]) return (self.chrom, start, stop, val) if self.data_format == "fixedStep": start = self.counter - 1 stop = start + self.span val = float(line.strip()) self.counter += self.step return (self.chrom, start, stop, val)