Source code for plastid.bin.test_table_equality

#!/usr/bin/env python
"""Regression-testing script designed to test equality between a newly-generated
file and a reference file that is intended to contain the same data. Rows and
columns are not expected to be in the same order. Float values are only required
to be equal within a user-specified tolerance. `NaN` values evaluate as equal
if and only if they occur in the same cell after sorting rows and columns.
Finally, specific columns may be excluded by name (or number, if there is no
header row).

Exit status is 0 if files are identical, 1 otherwise
"""
import argparse
import inspect

import sys
import numpy
import pandas as pd
from plastid.util.io.openers import opener, get_short_name, NullWriter
from plastid.util.io.filters import NameDateWriter, CommentReader

printer = NameDateWriter(get_short_name(inspect.stack()[-1][1]))

_NUMERIC_DTYPES = "biufc"


[docs]def equal_enough(col1, col2, tol=1e-10, printer=NullWriter()): """If `col1` and `col2` are both numeric, test that all their values are within `tol` of each other. :obj:`numpy.nan` values, if present, must be in the same place in each column. Ditto :obj:`numpy.inf` values. If `col1` and `col2` are not numeric, return true if they have the same `dtype` and the same values in all cells. Parameters ---------- col1 : numpy.ndarray First column of data col2 : numpy.ndarray Second column of data tol : float Error tolerance for numeric data between col1 and col2, value-wise printer : anything implementing a ``write()`` method (e.g. a |NameDateWriter|) if not None, rich comparison information will be sent to this writer Returns ------- bool `True` if `col1 == col2` for non-numeric data; `True` if `abs(col1 - col2) <= tol` for numeric data; `False` otherwise """ dtype_test = col1.dtype.kind in _NUMERIC_DTYPES dtype_test &= col2.dtype.kind in _NUMERIC_DTYPES if dtype_test: nan1 = numpy.isnan(col1) nan2 = numpy.isnan(col2) inf1 = numpy.isinf(col1) inf2 = numpy.isinf(col2) # check nans in same place nan_test = (nan1 == nan2).all() # make sure infs are in same place, and are same sign inf_test = (inf1 == inf2).all() & (col1[inf1] == col2[inf1]).all() diff_test = (abs(col1[~nan1 & ~inf1] - col2[~nan1 & ~inf1]) <= tol).all() if not nan_test: printer.write("Failed nan test") if not inf_test: printer.write("Failed inf test") if not diff_test: printer.write("Failed tolerance test") return nan_test & diff_test & inf_test elif col1.dtype.kind not in _NUMERIC_DTYPES and col2.dtype.kind not in _NUMERIC_DTYPES: diff_test = (col1 == col2) if not diff_test.all(): ltmp = [ "%s, %s, %s" % (N, val1, val2) for N, (val1, val2) in enumerate(zip(col1[~diff_test], col2[~diff_test])) ] printer.write("Differences:\n%s" % "\n".join(ltmp)) return diff_test.all() else: return False
[docs]def test_dataframe_equality( df1, df2, tol = 1e-8, sort_columns = [], printer = NullWriter(), print_verbose = False, return_verbose = False ): # yapf: disable """Test equality of dataframes over multiple columns, with verbose output. If `NaNs` or `Infs` are present, these must be present in corresponding cells in both dataframes for the dataframes to evaluate as equal. Parameters ---------- df1 : :py:class:`pd.DataFrame` First dataframe df2 : :py:class:`pd.DataFrame` Second dataframe tol : float, optoinal Maximum tolerated difference between floating point numbers (Default: *1e-8*) sort_columns : list, optional List of column names or indices on which to sort data before comparing values printer : file-like, optional Any logger importing a ``write()`` method print_verbose : bool, optional Print verbose output to stderr (Default: False) return_verbose : bool, optional If *True*, return a list of failure messages Returns ------- bool `True` if dataframes are equal, `False` otherwise list A list of strings explaining how `df1` and `df2` differ. Only returned if `return_verbose` is `True` """ if len(sort_columns) > 0: df1 = df1.sort_index(axis=1).sort_values(sort_columns, axis=0) df2 = df2.sort_index(axis=1).sort_values(sort_columns, axis=0) else: df1 = df1.sort_values(list(df1.columns), axis=0).sort_index() df2 = df2.sort_values(list(df2.columns), axis=0).sort_index() failures = [] keys1 = set(df1.columns) keys2 = set(df2.columns) retval = True if keys1 != keys2: failstrs = [ "Tables contain different columns (unique keys shown below):", " %s: %s" % (1, ", ".join([str(X) for X in keys1 - keys2])), " %s: %s" % (2, ", ".join([str(X) for X in keys2 - keys1])) ] printer.write("\n".join(failstrs)) failures.extend(failstrs) retval = False if len(df1) != len(df2): failstrs = [ "Tables contain different numbers of rows:", " %s: %s" % (1, len(df1)), " %s: %s" % (2, len(df2)) ] printer.write("\n".join(failstrs)) failures.extend(failstrs) retval = False if retval == True: if print_verbose == True: printer.write("Sorting data...") if print_verbose == True: printer.write( "Testing equality of values by column with numerical tolerance %.3e..." % tol ) unequal = [] for k in keys1: if print_verbose == True: printer.write(" Testing column %s..." % k) # take out values to get sorted numpy arrays, avoiding weird reordering # done behind the scenes by pandas s1 = df1[k].values s2 = df2[k].values if not equal_enough(s1, s2, tol=tol, printer=printer): unequal.append(k) else: if print_verbose == True: printer.write(" %s is identical" % k) failstr = "Column values are unequal for columns: %s" % ( ", ".join([str(X) for X in sorted(unequal)]) ) if len(unequal) > 0: printer.write(failstr) failures.append(failstr) retval = False if return_verbose == True: return retval, failures else: return retval
[docs]def main(argv=sys.argv[1:], verbose=False): """Command-line program Parameters ---------- argv : list, optional A list of command-line arguments, which will be processed as if the script were called from the command line if :py:func:`main` is called directly. Default: `sys.argv[1:]`. The command-line arguments, if the script is invoked from the command line verbose : bool, optional If `True`, return Returns ------- int `0` if files are identical, `1` otherwise str Only returned if `verbose` is selected. String describing how tables are unequal (e.g. which columns failed, et c). """ parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument("file1", type=str) parser.add_argument("file2", type=str) parser.add_argument( "-v", dest="verbose", default=False, action="store_true", help="Give verbose output" ) parser.add_argument( "--sort_keys", default=None, metavar="key", nargs="+", help= "If specified, values will be sorted by the column(s) corresponding to these name or numbers (0-indexed) before comparison" ) parser.add_argument( "--exclude", type=str, default=[], nargs="+", metavar="key", help="Key or number (0-indexed) of columns to exclude" ) parser.add_argument("--no_header", default=False, action="store_true", help="If specified, no header row is present. Columns "+\ "for all other command-line flags "+\ "must be referenced by number (starting at zero) "+\ "rather than name, and will be assumed to be in "+\ "the same order in both files.") parser.add_argument( "--tol", type=float, default=1e-8, help="Tolerance by which floats are allowed to differ (Default: 1e-8)" ) args = parser.parse_args(argv) kwargs = { "sep": "\t", "index_col": args.sort_keys, "comment": "#", } exclude = args.exclude if args.no_header is True: if args.sort_keys is not None: kwargs["index_col"] = [int(X) for X in args.sort_keys] exclude = [int(X) for X in exclude] with opener(args.file1) as fh: df1 = pd.read_table(fh, header=None, **kwargs) with opener(args.file2) as fh: df2 = pd.read_table(fh, header=None, **kwargs) else: with opener(args.file1) as fh: df1 = pd.read_table(fh, header=0, **kwargs) with opener(args.file2) as fh: df2 = pd.read_table(fh, header=0, **kwargs) if len(args.exclude) > 0: printer.write("Excluding columns %s: " % ", ".join(args.exclude)) for k in exclude: if k in df1: df1.pop(k) if k in df2: df2.pop(k) test_result, messages = test_dataframe_equality( df1, df2, printer=printer, print_verbose=True, return_verbose=True, tol=args.tol ) if test_result == True: printer.write("Files contain equivalent data.") exit_code = 0 else: printer.write("Files non-equivalent.") exit_code = 1 if __name__ == "__main__": sys.exit(exit_code) else: if args.verbose == True or verbose == True: return exit_code, messages else: return exit_code
if __name__ == "__main__": main()