#!/usr/bin/env python
"""Various wrappers and utilities for opening, closing, and writing files.
Important methods
-----------------
:py:func:`argsopener`
Opens a file for writing within a command-line script and writes to it
all command-line arguments as a pretty-printed dictionary of metadata,
commented out. The open file handle is then returned for subsequent
writing
:py:func:`read_pl_table`
Wrapper function to open a table saved by one of :data:`plastid`'s
command-line scripts into a :class:`pandas.DataFrame`.
:py:func:`opener`
Guesses whether a file is bzipped, gzipped, zipped, or uncompressed based upon
file extension, opens it appropriately, and returns a file-like object.
:py:func:`NullWriter`
Returns an open filehandle to the system's null location.
"""
import sys
import os
import pandas as pd
from plastid.util.io.filters import AbstractWriter
from plastid.util.services.mini2to3 import Iterable
[docs]class NullWriter(AbstractWriter):
"""Writes to system-dependent null location.
On Unix-like systems & OSX, this is typically /dev/null. On Windows, simply "nul"
"""
def __init__(self):
self.stream = open(os.devnull, "w")
[docs] def filter(self, stream):
return stream
def __repr__(self):
# unusual repr, but useful for documentation by Sphinx
return "NullWriter()"
def __str__(self):
return self.__repr__()
[docs]def multiopen(inp, fn=None, args=None, kwargs=None):
"""Normalize filename/file-like/list of filename or file-like to a list of appropriate objects
If not list-like, `inp` is converted to a list. Then, for each element `x` in
`inp`, if `x` is file-like, it is yielded. Otherwise, `fn` is applied to `x`,
and the result yielded.
Parameters
----------
inp : str, file-like, or list-like of either of those
Input describing file(s) to open
fn : callable, optional
Callable to apply to input to open it
args : tuple, optional
Tuple of positional arguments to pass to `fn`
kwargs : keyword arguments
Arguments to pass to `fn`
Yields
------
Object
Result of applying `fn` to filename(s) in `inp`
"""
if fn is None:
fn = lambda x, **z: x
if args is None:
args = ()
if kwargs is None:
kwargs = {}
if isinstance(inp, str):
out = [inp]
elif isinstance(inp, Iterable):
out = inp
for obj in out:
if isinstance(obj, str):
yield fn(obj, *args, **kwargs)
else:
yield obj
[docs]def opener(filename, mode="r", **kwargs):
"""Open a file, detecting whether it is compressed or not, based upon
its file extension. Extensions are tested in the following order:
+----------------+------------------+
| File ends with | Presumed to be |
+================+==================+
| gz | gzipped |
+----------------+------------------+
| bz2 | bzipped |
+----------------+------------------+
| zip | zipped |
+----------------+------------------+
| anything else | uncompressed |
+----------------+------------------+
Parameters
----------
filename : str
Name of file to open
mode : str
Mode in which to open file. See Python standard
libarary documentation on file opening modes for
choices (e.g. "r", "a, "w" with or without "b")
**kwargs
Other parameters to pass to appropriate file opener
"""
if filename.endswith(".gz"):
import gzip
if "b" not in mode:
mode += "b"
call_func = gzip.GzipFile
elif filename.endswith(".bz2"):
import bz2
if "b" not in mode:
mode += "b"
call_func = bz2.BZ2File
elif filename.endswith(".zip"):
import zipfile
if "b" not in mode:
mode += "b"
call_func = zipfile.ZipFile
else:
call_func = open
return call_func(filename, mode, **kwargs)
# TODO: needs unittest
[docs]def read_pl_table(filename, **kwargs):
"""Open a table saved by one of :data:`plastid`'s command-line scripts,
passing default arguments to :func:`pandas.read_table`:
========== =======
Key Value
---------- -------
sep `"\t"`
comment `"#"`
index_col `None`
header `0`
========== =======
Parameters
----------
filename : str
Name of file. Can be gzipped, bzipped, or zipped.
kwargs : keyword arguments
Other keyword arguments to pass to :func:`pandas.read_table`.
Will override defaults.
Returns
-------
:class:`pandas.DataFrame`
Table of results
"""
args = {
"sep" : "\t",
"comment" : "#",
"index_col" : None,
"header" : 0,
} # yapf: disable
args.update(kwargs)
table = pd.read_table(filename, **args)
return table
# TODO: needs unit test
[docs]def write_pl_table(df, filename, sep="\t", header=True, index=None, **kwargs):
"""Wrapper function to write DataFrame `df` to a tab-delimited table, with header
Parameters
----------
df : :class:`~pandas.DataFrame`
DataFrame to save
filename : str
Name of file to create
**kwargs : keyword arguments, optional
Any keyword argument readable by :meth:`pandas.DataFrame.to_csv`.
"""
return df.to_csv(filename, sep=sep, header=header, index=index, **kwargs)
[docs]def get_short_name(inpt, separator=os.path.sep, terminator=""):
"""Gives the basename of a filename or module name passed as a string.
If the string doesn't match the pattern specified by the separator
and terminator, it is returned unchanged.
Examples
--------
>>> get_short_name("test")
'test'
>>> get_short_name("test.py", terminator=".py")
'test'
>>> get_short_name("/home/jdoe/test.py", terminator=".py")
'test'
>>> get_short_name("/home/jdoe/test.py.py", terminator=".py")
'test.py'
>>> get_short_name("/home/jdoe/test.py.2")
'test.py.2'
>>> get_short_name("/home/jdoe/test.py.2", terminator=".py")
'test.py.2'
>>> get_short_name("plastid.bin.test", separator="\.", terminator="")
'test'
Parameters
----------
inpt : str
Input
terminator : str
File terminator (default: "")
Returns
-------
str
"""
import re
tlen = len(terminator)
if inpt[-tlen:] == terminator:
inpt = inpt[:-tlen]
try:
stmp = re.split(separator, inpt)[-1]
except AttributeError:
return inpt
return(stmp)
[docs]def argsopener(filename, namespace, mode="w", **kwargs):
"""Open a file for writing, and write to it command-line arguments
formatted as a pretty-printed dictionary in comment metadata.
Parameters
----------
filename : str
Name of file to open. If it terminates in `'.gz'` or `'.bz2'`
the filehandle will write to a gzipped or bzipped file
namespace : :py:class:`argparse.Namespace`
Namespace object from argparse.ArgumentParser
mode : str
Mode of writing (`'w'` or `'wb'`)
**kwargs
Other keyword arguments to pass to file opener
Returns
-------
open filehandle
"""
if "w" not in mode:
mode += "w"
fout = opener(filename, mode, **kwargs)
fout.write(args_to_comment(namespace))
return fout
[docs]def pretty_print_dict(dtmp):
"""Pretty prints an un-nested dictionary
Parameters
----------
dtmp : dict
Returns
-------
str
pretty-printed dictionary
"""
ltmp = []
keys = dtmp.keys()
maxlen = 2 + max([len(K) for K in keys])
for k, v in sorted(dtmp.items(), key=lambda x: x[0]):
if type(v) == type(""):
v = "'%s'" % v
new_k = "'%s'" % k
stmp = (" {0:<%s} : {1}," % maxlen).format(new_k, v)
ltmp.append(stmp)
sout = "\n".join(ltmp)
return "{\n%s\n}\n" % sout