Source code for nutsflow.sink

"""
.. module:: sink
   :synopsis: Nuts that reads streams and potentially produce non-stream output.
"""
from __future__ import absolute_import

import sys
import csv
import math
import six

import collections as cl

from six.moves import reduce, zip, range
from nutsflow.base import NutSink
from nutsflow.factory import nut_sink
from nutsflow.common import as_tuple, is_iterable, colfunc
from nutsflow.iterfunction import nth, consume, length, take


[docs]@nut_sink def Sort(iterable, key=None, reverse=False): """ iterable >> Sort(key=None, reverse=False) Sorts iterable with respect to key function or column index(es). >>> [3, 1, 2] >> Sort() [1, 2, 3] >>> [3, 1, 2] >> Sort(reverse=True) [3, 2, 1] >>> [(1,'c'), (2,'b'), (3,'a')] >> Sort(1) [(3, 'a'), (2, 'b'), (1, 'c')] >>> ['a3', 'c1', 'b2'] >> Sort(key=lambda s: s[0]) ['a3', 'b2', 'c1'] >>> ['a3', 'c1', 'b2'] >> Sort(key=0) ['a3', 'b2', 'c1'] >>> ['a3', 'c1', 'b2'] >> Sort(1) ['c1', 'b2', 'a3'] >>> ['a3', 'c1', 'b2'] >> Sort((1,0)) ['c1', 'b2', 'a3'] :param iterable iterable: Iterable :param int|tuple|function|None key: function to sort based on or column index(es) tuples/vectors/strings are sorted by. :param boolean reverse: True: reverse order. :return: Sorted iterable :rtype: list """ return sorted(iterable, key=colfunc(key), reverse=reverse)
[docs]@nut_sink def Sum(iterable, key=None): """ iterable >> Sum(key=None) Return sum over inputs (transformed or extracted by key function) >>> [1, 2, 3] >> Sum() 6 >>> [1, 2, 3] >> Sum(lambda x: x*x) 14 >>> data = [(1, 10), (2, 20), (3, 30)] >>> data >> Sum(key=0) 6 >>> data >> Sum(key=1) 60 :param iterable iterable: Iterable over numbers :param int|tuple|function|None key: Key function to extract elements. :return: Sum of numbers :rtype: number """ return sum(map(colfunc(key), iterable))
[docs]@nut_sink def Mean(iterable, key=None, default=None): """ iterable >> Mean(key=None, default=None) Return mean value of inputs (transformed or extracted by key function). >>> [1, 2, 3] >> Mean() 2.0 >>> [] >> Mean(default=0) 0 >>> data = [(1, 10), (2, 20), (3, 30)] >>> data >> Mean(key=0) 2.0 >>> data >> Mean(key=1) 20.0 :param iterable iterable: Iterable over numbers :param object default: Value returned if iterable is empty. :param int|tuple|function|None key: Key function to extract elements. :return: Mean of numbers or default value :rtype: number """ f = colfunc(key) sum, n = 0, 0 for e in iterable: sum += f(e) n += 1 return float(sum) / n if n else default
[docs]@nut_sink def MeanStd(iterable, key=None, default=None, ddof=1): """ iterable >> MeanStd(key=None, default=None, ddof=1) Return mean and standard deviation of inputs (transformed or extracted by key function). Standard deviation is with degrees of freedom = 1 >>> [1, 2, 3] >> MeanStd() (2.0, 1.0) >>> data = [(1, 10), (2, 20), (3, 30)] >>> data >> MeanStd(key=0) (2.0, 1.0) >>> data >> MeanStd(1) (20.0, 10.0) :param iterable iterable: Iterable over numbers :param object default: Value returned if iterable is empty. :param int|tuple|function|None key: Key function to extract elements. :param int ddof: Delta degrees of freedom (should 0 or 1) :return: Mean and standard deviation of numbers or default value :rtype: tuple (mean, std) """ f = colfunc(key) sume, sqre, n = 0.0, 0.0, 0.0 for e in iterable: e = f(e) sume += e sqre += e * e n += 1.0 if n - ddof <= 0: return default avg = sume / n dev = math.sqrt((n * sqre - sume * sume) / (n * (n - ddof))) return avg, dev
[docs]@nut_sink def Max(iterable, key=None, default=None): """ iterable >> Max(key=None, default=None) Return maximum of inputs (transformed or extracted by key function). >>> [1, 2, 3, 2] >> Max() 3 >>> ['1', '123', '12'] >> Max(key=len) '123' >>> [] >> Max(default=0) 0 >>> data = [(3, 10), (2, 20), (1, 30)] >>> data >> Max(key=0) (3, 10) >>> data >> Max(1) (1, 30) :param iterable iterable: Iterable over numbers :param int|tuple|function|None key: Key function to extract or transform elements. None = identity function. :param object default: Value returned if iterable is empty. :return: largest element according to key function :rtype: object """ try: return max(iterable, key=colfunc(key)) except Exception: return default
[docs]@nut_sink def Min(iterable, key=None, default=None): """ iterable >> Min(key=None, default=None) Return minimum of inputs (transformed or extracted by key function). >>> [1, 2, 3, 2] >> Min() 1 >>> ['1', '123', '12'] >> Min(key=len) '1' >>> [] >> Min(default=0) 0 >>> data = [(3, 10), (2, 20), (1, 30)] >>> data >> Min(key=0) (1, 30) >>> data >> Min(1) (3, 10) :param iterable iterable: Iterable over numbers :param int|tuple|function|None key: Key function to extract or transform elements. None = identity function. :param object default: Value returned if iterable is empty. :return: smallest element according to key function :rtype: object """ try: return min(iterable, key=colfunc(key)) except Exception: return default
[docs]@nut_sink def ArgMax(iterable, key=None, default=None, retvalue=False): """ iterable >> ArgMax(key=None, default=None, retvalue=False) Return index of first maximum element (and maximum) in input (transformed or extracted by key function). >>> [1, 2, 0, 2] >> ArgMax() 1 >>> ['12', '1', '123'] >> ArgMax(key=len, retvalue=True) (2, '123') >>> ['12', '1', '123'] >> ArgMax(key=len) 2 >>> [] >> ArgMax(default=0) 0 >>> [] >> ArgMax(default=(None, 0), retvalue=True) (None, 0) >>> data = [(3, 10), (2, 20), (1, 30)] >>> data >> ArgMax(key=0) 0 >>> data >> ArgMax(1) 2 :param iterable iterable: Iterable over numbers :param int|tuple|function|None key: Key function to extract or transform elements. None = identity function. :param object default: Value returned if iterable is empty. :param bool retvalue: If True the index and the value of the maximum element is returned. :return: index of largest element according to key function and the largest element itself if retvalue==True :rtype: object | tuple """ try: f = colfunc(key) i, v = max(enumerate(iterable), key=lambda i_e: f(i_e[1])) return (i, v) if retvalue else i except Exception: return default
[docs]@nut_sink def ArgMin(iterable, key=None, default=None, retvalue=False): """ iterable >> ArgMin(key=None, default=None, retvalue=True) Return index of first minimum element (and minimum) in input (transformed or extracted by key function). >>> [1, 2, 0, 2] >> ArgMin() 2 >>> ['12', '1', '123'] >> ArgMin(key=len, retvalue=True) (1, '1') >>> ['12', '1', '123'] >> ArgMin(key=len) 1 >>> [] >> ArgMin(default=0) 0 >>> [] >> ArgMin(default=(None, 0), retvalue=True) (None, 0) >>> data = [(3, 10), (2, 20), (1, 30)] >>> data >> ArgMin(key=0) 2 >>> data >> ArgMin(1) 0 :param iterable iterable: Iterable over numbers :param int|tuple|function|None key: Key function to extract or transform elements. None = identity function. :param object default: Value returned if iterable is empty. :param bool retvalue: If True the index and the value of the minimum element is returned. :return: index of smallest element according to key function and the smallest element itself if retvalue==True. :rtype: object | tuple """ try: f = colfunc(key) i, v = min(enumerate(iterable), key=lambda i_e1: f(i_e1[1])) return (i, v) if retvalue else i except Exception: return default
Reduce = nut_sink(reduce, 1) """ iterable >> Reduce(func [,initiaizer]) Reduces the iterable using the given function. See https://docs.python.org/2/library/functions.html#reduce >>> [1, 2, 3] >> Reduce(lambda a,b: a+b) 6 >>> [2] >> Reduce(lambda a,b: a*b, 1) 2 :param iterable iterable: Any iterable :param function func: Reduction function :return: Result of reduction :rtype: any """ Nth = nut_sink(nth) """ iterable >> Nth(nth) Return n-th element of iterable. This consumes the iterable! >>> 'test' >> Nth(2) s :param iterable iterable: Any iterable :param int nth: Index of element in iterable to return :return: n-th element :rtype: any """ Next = nut_sink(next) """ iterable >> Next() Return next element of iterable. >>> [1,2,3] >> Next() 1 :param iterable iterable: Any iterable :return: next element :rtype: any """ Consume = nut_sink(consume) """ iterable >> Consume(n=None) Consume n elements of the iterable. >>> [1,2,3] >> Print() >> Consume() # Without Consume nothing happens! 1 2 3 >>> [1,2,3] >> Print() >> Consume(2) 1 2 :param iterable iterable: Iterable :param int n: Number of elements to consume. n = None means the whole iterable is consumed. """ Count = nut_sink(length) """ iterable >> Count() Return number elements in input iterable. This consumes the iterable! >>> [0, 1, 2] >> Count() 3 :param iterable iterable: Any iterable :return: Number elements in interable :rtype: int """
[docs]@nut_sink def Unzip(iterable, container=None): """ iterable >> Unzip(container=None) Same as izip(*iterable) but returns iterators for container=None >>> [(1, 2, 3), (4, 5, 6)] >> Unzip(tuple) >> Collect() [(1, 4), (2, 5), (3, 6)] :param iterable iterable: Any iterable, e.g. list, range, ... :param container container: If not none, unzipped results are collected in the provided container, eg. list, tuple, set :return: Unzip iterable. :rtype: iterator over iterators """ unzipped = zip(*iterable) return map(container, unzipped) if container else unzipped
[docs]@nut_sink def Tail(iterable, n, container=list): """ iterable >> Tail(n, container=list) Collect last n elements of iterable in specified container. This consumes the iterable completely! >>> [1, 2, 3, 4] >> Tail(2) [3, 4] :param iterable iterable: Any iterable, e.g. list, range, ... :param int n: Number of elements to take. :param container container: Container to collect elements in, e.g. list, set :return: Container with tail elements :rtype: container """ return container(cl.deque(iterable, n))
[docs]@nut_sink def CountValues(iterable, column=None, relative=False): """ iterable >> CountValues(relative=False) Return dictionary with (relative) counts of the values in the input iterable. >>> 'abaacc' >> CountValues() # doctest: +SKIP {'a': 3, 'b': 1, 'c': 2} >>> 'aabaab' >> CountValues(relative=True) # doctest: +SKIP {'a': 1.0, 'b': 0.5} >>> data = [('a', 'X'), ('b', 'Y'), ('a', 'Y')] >>> data >> CountValues(column=0) # doctest: +SKIP {'a': 2, 'b': 1} >>> data >> CountValues(column=1) # doctest: +SKIP {'Y': 2, 'X': 1} :param iterable iterable: Any iterable, e.g. list, range, ... :param int|None column: Column of values in iterable to extract values from. If colum=None the values in the iterable themselves will be counted. :param bool relative: True: return relative counts otherwise absolute counts :return: Dictionary with (relative) counts for elements in iterable. :rtype: dict """ values = iterable if column is None else (i[column] for i in iterable) cnts = dict(cl.Counter(values)) if not relative or not cnts.values(): return cnts max_cnt = max(cnts.values()) n = float(max_cnt) if max_cnt else 1.0 return {k: v / n for k, v in six.iteritems(cnts)}
[docs]@nut_sink def Collect(iterable, container=list): """ iterable >> Collect(container) Collects all elements of the iterable input in the given container. >>> range(5) >> Collect() [0, 1, 2, 3, 4] >>> [1, 2, 3, 2] >> Collect(set) # doctest: +SKIP {1, 2, 3} >>> [('one', 1), ('two', 2)] >> Collect(dict) # doctest: +SKIP {'one': 1, 'two': 2} :param iterable iterable: Any iterable, e.g. list, range, ... :param container container: Some container, e.g. list, set, dict that can be filled from an iterable :return: Container :rtype: container """ return container(iterable)
[docs]@nut_sink def Join(iterable, separator=''): """ iterable >> Join(separator='') Same as Python's sep.join(iterable). Concatenates the elements in the iterable to a string using the given separator. In addition to Python's sep.join(iterable) it also automatically converts elements to strings. :param iterable iterable: Any iterable :param string separator: Seperator string between elements. :return: String of with concatenated elements of iterable. :rtype: str """ return separator.join(map(str, iterable))
[docs]class WriteCSV(NutSink): """ Write data to a CSV file using Python's CSV writer. See: https://docs.python.org/2/library/csv.html """
[docs] def __init__(self, filepath, cols=None, skipheader=0, flush=False, encoding=None, fmtfunc=lambda x: x, **kwargs): """ WriteCSV(filepath, cols, skipheader, flush, fmtfunc, **kwargs) Write data in Comma Separated Values format (CSV) and other formats to file. Tab Separated Values (TSV) files can be written by specifying a different delimiter. Note that in the docstring below delimiter is '\\t' but in code it should be '\t'. See unit tests. Also see https://docs.python.org/2/library/csv.html and ReadCSV. >>> import os >>> filepath = 'tests/data/temp_out.csv' >>> with WriteCSV(filepath) as writer: ... range(10) >> writer >>> os.remove(filepath) >>> with WriteCSV(filepath, cols=(1,0)) as writer: ... [(1,2), (3,4)] >> writer >>> os.remove(filepath) >>> filepath = 'tests/data/temp_out.tsv' >>> with WriteCSV(filepath, delimiter='\\t') as writer: ... [[1,2], [3,4]] >> writer >>> os.remove(filepath) :param string filepath: Path to file in CSV format. :param tuple cols: Indices of the columns to write. If None all columns are written. :param int skipheader: Number of header rows to skip. :param bool flush: If True flush after every line written. :param str encoding: Character encoding, e.g. "utf-8" Ignored for Python 2.x! :param function fmtfunc: Function to apply to the elements of each row. :param kwargs kwargs: Keyword arguments for Python's CSV writer. See https://docs.python.org/2/library/csv.html """ open2 = lambda fp: open(fp, 'w') open3 = lambda fp: open(fp, 'w', encoding=encoding) is_py3 = sys.version_info >= (3, 0) self.csvfile = open3(filepath) if is_py3 else open2(filepath) self.columns = cols if cols is None else as_tuple(cols) self.flush = flush self.fmtfunc = fmtfunc self.skipheader = skipheader self.writer = csv.writer(self.csvfile, lineterminator='\n', **kwargs)
[docs] def close(self): """Close writer""" self.csvfile.close() self.writer = None
def __enter__(self): """Implementation of context manager API""" return self def __exit__(self, *args): """Implementation of context manager API""" self.close()
[docs] def __rrshift__(self, iterable): """Write elements of iterable to file""" cols = self.columns iterable = iter(iterable) for _ in range(self.skipheader): next(iterable) for row in iterable: row = row if is_iterable(row) else [row] row = [row[i] for i in cols] if cols else row row = [self.fmtfunc(r) for r in row] self.writer.writerow(row) if self.flush: self.csvfile.flush()