nutsflow package

Submodules

nutsflow.base module

class Nut(*args, **kwargs)[source]

Bases: object

Base class for all Nuts. Iterables or functions wrapped in Nuts can be chained using the ‘>>’ operator. The aim is code with an explicit data flow. See the following example using Python iterators versus Nuts:

>>> from six.moves import filter, range
>>> from itertools import islice
>>> list(islice(filter(lambda x: x > 5, range(10)), 3))
[6, 7, 8]
>>> from nutsflow import Range, Filter, Take, Collect, _
>>> Range(10) >> Filter(_ > 5) >> Take(3) >> Collect()
[6, 7, 8]
__call__(iterable)[source]

Nut (processor) can be called as a function and mapped on iterable elements within an iterable.

Parameters:iterable (iterable) – Iterable to process.
Returns:Iterable
Return type:iterable
__init__(*args, **kwargs)[source]

Constructor. Nuts (and derived classes) can have arbitrary arguments.

Parameters:
  • args (args) – Positional arguments.
  • kwargs (kwargs) – Keyword arguments.
__rrshift__(iterable)[source]

Chaining operator for Nuts. Needs to be overridden!

Takes an input iterable and produces some output iterable. If the number of elements in the input and the output iterable does not change consider NutFunction instead.

Parameters:iterable (iterable) – Iterable to process.
Returns:Iterable
Return type:iterable
Raise:NotImplementedError if not implemented.
class NutFunction(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

Nut functions are are mapped onto each element of the input iterable.

Example: Square is a Nut function

>>> from nutsflow import Square, Collect, _
>>> [1,2,3] >> Square() >> Collect()
[1, 4, 9]
__call__(element)[source]

Override this method to transform the elements of an iterable.

Parameters:element – Element the function is applied to.
Returns:A transformed element
Return type:any
Raise:NotImplementedError if not implemented.
__rrshift__(iterable)[source]

Map function onto iterable and return transformed iterable. Do not override!

Parameters:iterable – function is applied to the elements of the iterable.
Returns:transformed iterable.
Return type:iterable
class NutSink(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

Sinks are nuts that do not guarantee to produce an iterable output.

Sinks are typically at the end of a flow and typically aggregate the flow to a single output, e.g. the sum of its elements. Need to override __rrshift__()!

__call__(iterable)[source]

Sinks can serve as functions applied to iterables within a flow.

Parameters:iterable – Sink takes iterable as input
Returns:Output of sink
Return type:any
class NutSource(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

Sources are nuts that have no input iterable but produce an output iterable.

__rrshift__(iterable)[source]

Raises an exception when called. Sources have not input! Do not override! Override __iter__() instead.

Parameters:iterable (iterable) – Iterable
Raise:SyntaxError if called.

nutsflow.common module

class Redirect(channel='STDOUT')[source]

Bases: object

Redirect stdout or stderr to string.

>>> with Redirect() as out:
...     print('test')
>>> print(out.getvalue())
test
>>> with Redirect('STDERR') as out:
...     print('error', file=sys.stderr)
>>> print(out.getvalue())
error
__init__(channel='STDOUT')[source]

Initialize self. See help(type(self)) for accurate signature.

class StableRandom(seed=None)[source]

Bases: random.Random

A pseudo random number generator that is stable across Python 2.x and 3.x. Use this only for unit tests or doctests. This class is derived from random.Random and supports all methods of the base class.

>>> rand = StableRandom(0)
>>> rand.random()
0.5488135024320365
>>> rand.randint(1, 10)
6
>>> lst = [1, 2, 3, 4, 5]
>>> rand.shuffle(lst)
>>> lst
[1, 3, 2, 5, 4]
__init__(seed=None)[source]

Initialize random number generator.

Parameters:seed (None|int) – Seed. If None the system time is used.
gauss_next()[source]

Return next gaussian random number.

Returns:Random number sampled from gaussian distribution.
Return type:float
getstate()[source]

Return state of generator.

Returns:Index and Mersenne Twister array.
Return type:tuple
jumpahead(n)[source]

Set state of generator far away from current state.

Parameters:n (int) – Distance to jump.
random()[source]

Return next random number in [0,1[

seed(seed=None)[source]

Set seed.

Parameters:seed (None|int) – Seed. If None the system time is used.
setstate(state)[source]

Set state of generator.

Parameters:state (tuple) – State to set as produced by getstate()
as_list(x)[source]

Return x as list.

If x is a single item it gets wrapped into a list otherwise it is changed to a list, e.g. tuple => list

Parameters:or iterable x (item) – Any item or iterable
Returns:list(x)
Return type:list
as_set(x)[source]

Return x as set.

If x is a single item it gets wrapped into a set otherwise it is changed to a set, e.g. list => set

Parameters:or iterable x (item) – Any item or iterable
Returns:set(x)
Return type:set
as_tuple(x)[source]

Return x as tuple.

If x is a single item it gets wrapped into a tuple otherwise it is changed to a tuple, e.g. list => tuple

Parameters:or iterable x (item) – Any item or iterable
Returns:tuple(x)
Return type:tuple
colfunc(key)[source]

Return function that extracts element from columns.

Used to create key functions when only column index or tuple of column indices is given. For instance:

>>> data = ['a3', 'c1', 'b2']
>>> sorted(data, key=colfunc(0))  # == sorted(data, key=lamda s:s[0])
['a3', 'b2', 'c1']
>>> sorted(data, key=colfunc(1))
['c1', 'b2', 'a3']
>>> list(map(colfunc((1,0)), data))
[['3', 'a'], ['1', 'c'], ['2', 'b']]
Parameters:key (function|None) – function or None. If None the identity function is returned
Returns:Column extraction function.
Return type:function
console(*args, **kwargs)[source]

Print to stdout and flush.

Wrapper around Python’s print function that ensures flushing after each call.

>>> console('test')
test
Parameters:
  • args – Arguments
  • kwargs – Key-Word arguments.
is_iterable(obj)[source]

Return true if object has iterator but is not a string

Parameters:obj (object) – Any object
Returns:True if object is iterable but not a string.
Return type:bool
sec_to_hms(duration)[source]

Return hours, minutes and seconds for given duration.

>>> sec_to_hms('80')
(0, 1, 20)
Parameters:duration (int|str) – Duration in seconds. Can be int or string.
Returns:tuple (hours, minutes, seconds)
Return type:(int, int, int)
timestr(duration, fmt='{:d}:{:02d}:{:02d}')[source]

Return duration as formatted time string or empty string if no duration

>>> timestr('80')
'0:01:20'
Parameters:
  • duration (int|str) – Duration in seconds. Can be int or string.
  • str – Format for string, e.g. ‘{:d}:{:02d}:{:02d}’
Returns:

duration as formatted time, e.g. ‘0:01:20’ or ‘’ if duration shorter than one second.

Return type:

string

nutsflow.factory module

nut_filter(func)[source]

Decorator for Nut filters.

Also see nut_filerfalse(). Example on how to define a custom filter nut:

@nut_filter
def GreaterThan(x, threshold):
    return x > threshold

[1, 2, 3, 4] >> GreaterThan(2) >> Collect()  --> [3, 4] 
Parameters:func (function) – Function to decorate. Must return boolean value.
Returns:Nut filter for given function
Return type:Nut
nut_filterfalse(func)[source]

Decorator for Nut filters that are inverted.

Also see nut_filter(). Example on how to define a custom filter-false nut:

@nut_filterfalse
def NotGreaterThan(x, threshold):
    return x > threshold

[1, 2, 3, 4] >> NotGreaterThan(2) >> Collect()  --> [1, 2]
Parameters:func (function) – Function to decorate
Returns:Nut filter for given function. . Must return boolean value.
Return type:Nut
nut_function(func)[source]

Decorator for Nut functions.

Example on how to define a custom function nut:

@nut_function
def TimesN(x, n):
    return x * n

[1, 2, 3] >> TimesN(2) >> Collect()  -->  [2, 4, 6]
Parameters:func (function) – Function to decorate
Returns:Nut function for given function
Return type:NutFunction
nut_processor(func, iterpos=0)[source]

Decorator for Nut processors.

Example on how to define a custom processor nut:

@nut_processor
def Clone(iterable, n):
    for e in iterable:
        for _ in xrange(p):
            yield e

[1, 2, 3] >> Clone(2) >> Collect()  --> [1, 1, 2, 2, 3, 3]
Parameters:
  • func (function) – Function to decorate
  • iterpos – Position of iterable in function arguments
Returns:

Nut processor for given function

Return type:

Nut

nut_sink(func, iterpos=0)[source]

Decorator for Nut sinks.

Example on how to define a custom sink nut:

@nut_sink
def Collect(iterable, container):
    return container(iterable)

xrange(5) >> Collect(tuple)  -->   (0, 1, 2, 3, 4)
Parameters:
  • func (function) – Function to decorate
  • iterpos – Position of iterable in function arguments
Returns:

Nut sink for given function

Return type:

NutSink

nut_source(func)[source]

Decorator for Nut sources.

Example on how to define a custom source nut:

@nut_source
def Range(start, end):
    return xrange(start, end)

Range(0, 5) >> Collect()  --> [0, 1, 2, 3, 4]
Parameters:func (function) – Function to decorate
Returns:Nut source for given function
Return type:NutSource

nutsflow.function module

class Counter(name, filterfunc=<function Counter.<lambda>>, value=0)[source]

Bases: nutsflow.base.NutFunction

Increment counter depending on elements in iterable. Intended mostly for debugging and monitoring. Avoid for standard processing of data. The function has side-effects but is thread-safe.

__call__(x)[source]

Increment counter.

Parameters:x (object) – Element in iterable
Returns:Unchanged element
Return type:Any
__init__(name, filterfunc=<function Counter.<lambda>>, value=0)[source]

counter = Counter(name, filterfunc, value) iterable >> counter

>>> from nutsflow import Consume
>>> counter = Counter('smallerthan3', lambda x: x < 3, 1)
>>> range(10) >> counter >> Consume()
>>> counter
smallerthan3 = 4
Parameters:
  • name (str) – Name of the counter
  • filterfunc (func) – Filter function.
  • value (int) – Initial value Count only elements where func returns True.
reset(value=0)[source]

Reset counter to given value.

Parameters:value (int) – Reset value
class Format(*args, **kwargs)[source]

Bases: nutsflow.base.NutFunction

iterable >> Format(fmt)

Return input as formatted string. For format definition see: https://docs.python.org/2/library/string.html

>>> from nutsflow import Collect
>>> [1, 2, 3] >> Format('num:{}') >> Collect()
['num:1', 'num:2', 'num:3']
>>> [(1, 2), (3, 4)] >> Format('{0}:{1}') >> Collect()
['1:2', '3:4']
Parameters:
  • iterable (iterable) – Any iterable
  • fmt (string) – Formatting string, e.g. ‘{:02d}’
Returns:

Returns inputs as strings formatted as specified

Return type:

str

__call__(element)
class Get(*args, **kwargs)[source]

Bases: nutsflow.base.NutFunction

iterable >> Get(start, end, step)

Extract elements from x. Equivalent to Python slicing [start:end:step] but per element of the iterable.

>>> from nutsflow import Collect
>>> [(1, 2, 3), (4, 5, 6)] >> Get(1) >> Collect()
[2, 5]
>>> [(1, 2, 3), (4, 5, 6)] >> Get(0, 2) >> Collect()
[(1, 2), (4, 5)]
>>> [(1, 2, 3), (4, 5, 6)] >> Get(0, 3, 2) >> Collect()
[(1, 3), (4, 6)]
>>> [(1, 2, 3), (4, 5, 6)] >> Get(None) >> Collect()
[(1, 2, 3), (4, 5, 6)]
Parameters:
  • iterable (iterable) – Any iterable
  • x (indexable) – Any indexable input
  • start (int) – Start index for columns to extract from x If start = None, x is returned
  • end (int) – End index (not inclusive)
  • step (int) – Step index (same as slicing)
Returns:

Extracted elements

Return type:

object|list

__call__(element)
class GetCols(*args, **kwargs)[source]

Bases: nutsflow.base.NutFunction

iterable >> GetCols(*columns)

Extract elements in given order from x. Also useful to change the order of or clone elements in x.

>>> from nutsflow import Collect
>>> [(1, 2, 3), (4, 5, 6)] >> GetCols(1) >> Collect()
[(2,), (5,)]
>>> [[1, 2, 3], [4, 5, 6]] >> GetCols(2, 0) >> Collect()
[(3, 1), (6, 4)]
>>> [[1, 2, 3], [4, 5, 6]] >> GetCols((2, 0)) >> Collect()
[(3, 1), (6, 4)]
>>> [(1, 2, 3), (4, 5, 6)] >> GetCols(2, 1, 0) >> Collect()
[(3, 2, 1), (6, 5, 4)]
>>> [(1, 2, 3), (4, 5, 6)] >> GetCols(1, 1) >> Collect()
[(2, 2), (5, 5)]
Parameters:
  • iterable (iterable) – Any iterable
  • container x (indexable) – Any indexable input
  • columns (int|tuple|args) – Indicies of elements/columns in x to extract or a tuple with these indices.
Returns:

Extracted elements

Return type:

tuple

__call__(element)
class Identity(*args, **kwargs)[source]

Bases: nutsflow.base.NutFunction

iterable >> Identity()

Return same input as console.

>>> from nutsflow import Collect
>>> [1, 2, 3] >> Identity() >> Collect()
[1, 2, 3]
Parameters:
  • iterable (iterable) – Any iterable
  • x (any) – Any input
Returns:

Returns input unaltered

Return type:

object

__call__(element)
class NOP(*args, **kwargs)[source]

Bases: nutsflow.base.NutFunction

iterable >> Nop(*args)

No Operation. Useful to skip nuts. Same as commenting a nut out or removing it from a pipeline.

>>> from nutsflow import Collect
>>> [1, 2, 3] >> NOP(Square()) >> Collect()
[1, 2, 3]
Parameters:
  • iterable (iterable) – Any iterable
  • x (object) – Any object
  • args (args) – Additional args are ignored.
Returns:

Squared number

Return type:

number

__call__(element)
class Print(fmtfunc=None, every_sec=0, every_n=0, filterfunc=<function Print.<lambda>>, end='n')[source]

Bases: nutsflow.base.NutFunction

Print elements in iterable.

__call__(x)[source]

Return element x and potentially print its value

__init__(fmtfunc=None, every_sec=0, every_n=0, filterfunc=<function Print.<lambda>>, end='\n')[source]
iterable >> Print(fmtfunc=None, every_sec=0, every_n=0,
filterfunc=lambda x: True)

Return same input as console but print for each element.

>>> from nutsflow import Consume
>>> [1, 2] >> Print() >> Consume()
1
2
>>> range(10) >> Print(every_n=3) >> Consume()
2
5
8
>>> even = lambda x: x % 2 == 0
>>> [1, 2, 3, 4] >> Print(filterfunc=even) >> Consume()
2
4
>>> [{'val': 1}, {'val': 2}] >> Print('number={val}') >> Consume()
number=1
number=2
>>> [[1, 2], [3, 4]] >> Print('number={1}:{0}') >> Consume()
number=2:1
number=4:3
>>> myfmt = lambda x: 'char='+x.upper()
>>> ['a', 'b'] >> Print(myfmt) >> Consume()
char=A
char=B
>>> range(5) >> Print('.', end=' ') >> Consume()
. . . . .
Parameters:
  • x (object) – Any input
  • fmtfunc (string|function) – Format string or function. fmtfunc is a standard Python str.format() string, see https://docs.python.org/2/library/string.html or a function that returns a string.
  • every_sec (float) – Print every given second, e.g. to print every 2.5 sec every_sec = 2.5
  • every_n (int) – Print every n-th call.
  • end (str) – Ending of text printed.
  • filterfunc (function) – Boolean function to filter print.
Returns:

Returns input unaltered

Return type:

object

Raise:

ValueError if fmtfunc is not string or function

class Sleep(*args, **kwargs)[source]

Bases: nutsflow.base.NutFunction

iterable >> Sleep(duration)

Return same input as console but sleep for each element.

>>> from nutsflow import Collect
>>> [1, 2, 3] >> Sleep(0.1) >> Collect()
[1, 2, 3]
Parameters:
  • iterable (iterable) – Any iterable
  • x (object) – Any input
  • duration (float) – Sleeping time in seconds.
Returns:

Returns input unaltered

Return type:

object

__call__(element)
class Square(*args, **kwargs)[source]

Bases: nutsflow.base.NutFunction

iterable >> Square()

Return squared input.

>>> from nutsflow import Collect
>>> [1, 2, 3] >> Square() >> Collect()
[1, 4, 9]
Parameters:
  • iterable (iterable) – Any iterable over numbers
  • x (number) – Any number
Returns:

Squared number

Return type:

number

__call__(element)

nutsflow.iterfunction module

class PrefetchIterator(iterable, num_prefetch=1)[source]

Bases: threading.Thread, object

Wrap an iterable in an iterator that prefetches elements.

Typically used to fetch samples or batches while the the GPU processes the batch. Keeps the CPU busy pre-processing data and not waiting for the GPU to finish the batch.

>>> from __future__ import print_function
>>> for i in PrefetchIterator(range(4)):
...    print(i)
0
1
2
3
__init__(iterable, num_prefetch=1)[source]

Constructor.

Parameters:
  • iterable (iterable) – Iterable elements are fetched from.
  • num_prefetch (int) – Number of elements to pre-fetch.
run()[source]

Put elements in input iterable into queue.

chunked(iterable, n)[source]

Split iterable in chunks of size n, where each chunk is also an iterator.

for chunk in chunked(range(10), 3):
for element in chunk:
print element
>>> it = chunked(range(7), 2)
>>> list(map(tuple, it))
[(0, 1), (2, 3), (4, 5), (6,)]
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • n – Chunk size
Returns:

Chunked iterable

Return type:

Iterator over iterators

consume(iterable, n=None)[source]

Consume n elements of the iterable.

>>> it = iter([1,2,3,4])
>>> consume(it, 2)
>>> next(it)
3

See https://docs.python.org/2/library/itertools.html

Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • n – Number of elements to consume. For n=None all are consumed.
flatmap(func, iterable)[source]

Map function to iterable and flatten.

>>> f = lambda n: str(n) * n
>>> list( flatmap(f, [1, 2, 3]) )
['1', '2', '2', '3', '3', '3']
>>> list( map(f, [1, 2, 3]) )  # map instead of flatmap
['1', '22', '333']
Parameters:
  • func (function) – Function to map on iterable.
  • iterable (iterable) – Any iterable, e.g. list, range, …
Returns:

Iterator of iterable elements transformed via func and flattened.

Return type:

Iterator

flatten(iterable)[source]

Return flattened iterable.

>>> list(flatten([(1,2), (3,4,5)]))
[1, 2, 3, 4, 5]
Parameters:iterable (iterable) –
Returns:Iterator over flattened elements of iterable
Return type:Iterator
interleave(*iterables)[source]

Return generator that interleaves the elements of the iterables.

>>> list(interleave(range(5), 'abc'))
[0, 'a', 1, 'b', 2, 'c', 3, 4]
>>> list(interleave('12', 'abc', '+-'))
['1', 'a', '+', '2', 'b', '-', 'c']
Parameters:iterables (iterable) – Collection of iterables, e.g. lists, range, …
Returns:Interleaved iterables.
Return type:iterator
length(iterable)[source]

Return number of elements in iterable. Consumes iterable!

>>> length(range(10))
10
Parameters:iterable (iterable) – Any iterable, e.g. list, range, …
Returns:Length of iterable.
Return type:int
nth(iterable, n, default=None)[source]

Return n-th element of iterable. Consumes iterable!

>>> nth(range(10), 2)
2
>>> nth(range(10), 100, default=-1)
-1

https://docs.python.org/2/library/itertools.html#itertools.islice

Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • n – Index of element to retrieve.
  • default – Value to return when iterator is depleted
Returns:

nth element

Return type:

Any or default value.

partition(iterable, pred)[source]

Split iterable into two partitions based on predicate function

>>> pred = lambda x: x < 6
>>> smaller, larger = partition(range(10), pred)
>>> list(smaller)
[0, 1, 2, 3, 4, 5]
>>> list(larger)
[6, 7, 8, 9]
Parameters:
  • iterable – Any iterable, e.g. list, range, …
  • pred – Predicate function.
Returns:

Partition iterators

Return type:

Two iterators

take(iterable, n)[source]

Return iterator over last n elements of given iterable.

>>> list(take(range(10), 3))
[0, 1, 2]

See: https://docs.python.org/2/library/itertools.html#itertools.islice

Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • n (int) – Number of elements to take
Returns:

Iterator over last n elements

Return type:

iterator

unique(iterable, key=None)[source]

Return only unique elements in iterable. Potentially high mem. consumption!

>>> list(unique([2,3,1,1,2,4]))
[2, 3, 1, 4]
>>> ''.join(unique('this is a test'))
'this ae'
>>> data = [(1,'a'), (2,'a'), (3,'b')]
>>> list(unique(data, key=lambda t: t[1]))
[(1, 'a'), (3, 'b')]
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • key – Function used to compare for equality.
Returns:

Iterator over unique elements.

Return type:

Iterator

nutsflow.processor module

class Append(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Append(items)

Append item(s) to lists/tuples in iterable.

>>> [(1, 2), (3, 4)] >> Append('X') >> Collect()
[(1, 2, 'X'), (3, 4, 'X')]
>>> items = ['a', 'b']
>>> [(1, 2), (3, 4)] >> Append(items) >> Collect()
[(1, 2, 'a'), (3, 4, 'b')]
>>> items = [('a', 'b'), ('c', 'd')]
>>> [(1, 2), (3, 4)] >> Append(items) >> Collect()
[(1, 2, 'a', 'b'), (3, 4, 'c', 'd')]
>>> from nutsflow import Enumerate
>>> [(1, 2), (3, 4)] >> Append(Enumerate()) >> Collect()
[(1, 2, 0), (3, 4, 1)]
Parameters:
  • iterable iterable (iterable) – Any iterable over tuples or lists
  • items (iterable|object) – A single object or an iterable over objects.
Returns:

iterator where items are appended to the iterable elements.

Return type:

iterator over tuples

__rrshift__(iterable)
class Cache(cachepath=None, clearcache=True)[source]

Bases: nutsflow.base.Nut

A very naive implementation of a disk cache. Pickles elements of iterable to file system and loads them the next time instead of recomputing.

__init__(cachepath=None, clearcache=True)[source]

iterable >> Cache()

Cache elements of iterable to disk. Only worth it if elements of iterable are time-consuming to produce and can be loaded faster from disk.

with Cache() as cache:
    data = range(100)
    for i in range(10):
        data >> expensive_op >> cache >> process(i) >> Consume()
cache = Cache()
for _ in range(100)
    data >> expensive_op >> cache >> Collect()
cache.clear()
with Cache('path/to/mycache') as cache:
    for _ in range(100)
        data >> expensive_op >> cache >> Collect()
Parameters:
  • iterable (iterable) – Any iterable
  • cachepath (string) – Path to a folder that stores the cached objects. If the path does not exist it will be created. The path with all its contents will be deleted when the cache is deleted. For cachepath=None a temporary folder will be created. Path to this folder is available in cache.path.
  • clearcache (bool) – Clear left-over cache if it exists.
Returns:

Iterator over elements

Return type:

iterator

__rrshift__(iterable)[source]

Return elements in iterable.

Parameters:iterable (iterable) – Any iterable
Returns:Generator over same elements as input iterable.
Return type:Generator
clear()[source]

Clear cache

class Chunk(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Chunk(n, container=None)

Split iterable in chunks of size n, where each chunk is also an iterator if no container is provided. see also GroupBySorted(), ChunkWhen(), ChunkBy()

>>> from nutsflow import Range, Map, Print, Join, Consume, Collect
>>> Range(5) >> Chunk(2) >> Map(list) >> Print() >> Consume()
[0, 1]
[2, 3]
[4]

The code can be shortend by providing a container in Chunk():

>>> Range(5) >> Chunk(2, list) >> Print() >> Consume()
[0, 1]
[2, 3]
[4]
>>> Range(6) >> Chunk(3, Join('_')) >> Print() >> Consume()
0_1_2
3_4_5
>>> Range(6) >> Chunk(3, sum) >> Collect()
[3, 12]
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • n (int) – Chunk size
  • container (container) – Some container, e.g. list, set, dict that can be filled from an iterable
Returns:

Chunked iterable

Return type:

Iterator over iterators or containers

__rrshift__(iterable)
class ChunkBy(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> ChunkBy(func, container=None)

Chunk iterable and create chunk every time func changes its return value. see also GroupBySorted(), Chunk(), ChunkWhen()

>>> [1,1, 2, 3,3,3] >> ChunkBy(lambda x: x, tuple) >> Collect()
[(1, 1), (2,), (3, 3, 3)]
>>> [1,1, 2, 3,3,3] >> ChunkBy(lambda x: x < 3, tuple)  >> Collect()
[(1, 1, 2), (3, 3, 3)]
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • func (function) – Functions the iterable is chunked by
  • container (container) – Some container, e.g. list, set, dict that can be filled from an iterable
Returns:

Chunked iterable

Return type:

Iterator over iterators or containers

__rrshift__(iterable)
class ChunkWhen(func, container=None)[source]

Bases: nutsflow.base.Nut

__init__(func, container=None)[source]

iterable >> ChunkWhen(func, container=None)

Chunk iterable and create new chunk every time func returns True. see also GroupBySorted(), Chunk(), ChunkBy()

>>> from nutsflow import Map, Join, Collect
>>> func = lambda x: x == 1
>>> [1,2,1,3,1,4,5] >> ChunkWhen(func, tuple) >> Collect()
[(1, 2), (1, 3), (1, 4, 5)]
>>> func = lambda x: x == 1
>>> [1,2,1,3,1,4,5] >> ChunkWhen(func, sum) >> Collect()
[3, 4, 10]
>>> func = lambda x: x == '|'
>>> '0|12|345|6' >> ChunkWhen(func, Join()) >> Collect()
['0', '|12', '|345', '|6']
Parameters:
  • func (function) – Boolean function that indicates chunks. New chunk is created if return value is True.
  • container (container) – Some container, e.g. list, set, dict that can be filled from an iterable
__rrshift__(iterable)[source]
Parameters:iterable iterable (any) – iterable to create chunks for.
Returns:Iterator over chunks, where each chunk is an iterator itself if no container is provided
Return type:iterator over iterators or containers
class Clone(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Clone(n)

Clones elements in the iterable n times.

>>> from nutsflow import Range, Collect, Join
>>> Range(4) >> Clone(2) >> Collect()
[0, 0, 1, 1, 2, 2, 3, 3]
>>> 'abc' >> Clone(3) >> Join()
'aaabbbccc'
Parameters:
  • iterable (iterable) – Any iterable
  • n – Number of clones
Returns:

Generator over cloned elements in iterable

Return type:

generator

__rrshift__(iterable)
Combine

alias of itertools._create_nut_wrapper.<locals>.Wrapper

class Concat(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Concat(*iterables)

Concatenate iterables.

>>> from nutsflow import Range, Collect
>>> Range(5) >> Concat('abc') >> Collect()
[0, 1, 2, 3, 4, 'a', 'b', 'c']
>>> '12' >> Concat('abcd', '+-') >> Collect()
['1', '2', 'a', 'b', 'c', 'd', '+', '-']
Parameters:
  • iterable (iterable) – Any iterable
  • iterables (iterable) – Iterables to concatenate
Returns:

Concatenated iterators

Return type:

iterator

__rrshift__(iterable)
Cycle

alias of itertools._create_nut_wrapper.<locals>.Wrapper

Dedupe

alias of nutsflow.iterfunction._create_nut_wrapper.<locals>.Wrapper

class Drop(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Drop(n)

Drop first n elements in iterable.

>>> [1, 2, 3, 4] >> Drop(2) >> Collect()
[3, 4]
Parameters:
  • iterable (iterable) – Any iterable
  • n (int) – Number of elements to drop
Returns:

Iterator without dropped elements

Return type:

iterator

__rrshift__(iterable)
class DropWhile(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> DropWhile(func)

Skip elements in iterable while predicate function is True.

>>> from nutsflow import _
>>> [0, 1, 2, 3, 0] >> DropWhile(_ < 2) >> Collect()
[2, 3, 0]
Parameters:
  • iterable (iterable) – Any iterable
  • func (function) – Predicate function.
Returns:

Iterable

Return type:

Iterator

__rrshift__(iterable)
Filter

alias of builtins._create_nut_wrapper.<locals>.Wrapper

class FilterCol(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> FilterCol(columns, func)

Filter elements from iterable based on predicate function and specified column(s).

>>> is_even = lambda n: n % 2 == 0
>>> [(0, 'e'), (1, 'o'), (2, 'e')] >> FilterCol(0, is_even) >> Collect()
[(0, 'e'), (2, 'e')]
Parameters:
  • iterable (iterable) – Any iterable
  • columns (int|tuple) – Column or columns to extract from each element before passing it on to the predicate function.
  • func (function) – Predicate function. Element is removed if False.
Returns:

Filtered iterable

Return type:

Iterator

__rrshift__(iterable)
FilterFalse

alias of itertools._create_nut_wrapper.<locals>.Wrapper

FlatMap

alias of nutsflow.iterfunction._create_nut_wrapper.<locals>.Wrapper

class Flatten(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Flatten()

Flatten the iterables within the iterable and non-iterables are passed through. Only one level is flattened. Chain Flatten to flatten deeper structures.

>>> from nutsflow import Collect
>>> [(1, 2), (3, 4, 5), 6] >> Flatten() >> Collect()
[1, 2, 3, 4, 5, 6]
>>> [(1, (2)), (3, (4, 5)), 6] >> Flatten() >> Flatten() >> Collect()
[1, 2, 3, 4, 5, 6]
Parameters:iterable (iterable) – Any iterable.
Returns:Flattened iterable
Return type:Iterator
__rrshift__(iterable)
class FlattenCol(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> FlattenCol(cols)

Flattens the specified columns of the tuples/iterables within the iterable. Only one level is flattened.

(1 3) (5 7) (2 4) (6 8) >> FlattenCol((0,1) >> (1 3) (2 4) (5 7) (6 8)

If a column contains a single element (instead of an iterable) it is wrapped into a repeater. This allows to flatten columns that are iterable together with non-iterable columns, e.g.

(1 3) (6 7) (2 ) ( 8) >> FlattenCols((0,1) >> (1 3) (2 3) (6 7) (6 8)

>>> from nutsflow import Collect
>>> data = [([1, 2], [3, 4]), ([5, 6], [7, 8])]
>>> data >> FlattenCol(0) >> Collect()
[(1,), (2,), (5,), (6,)]
>>> data >> FlattenCol((0, 1)) >> Collect()
[(1, 3), (2, 4), (5, 7), (6, 8)]
>>> data >> FlattenCol((1, 0)) >> Collect()
[(3, 1), (4, 2), (7, 5), (8, 6)]
>>> data >> FlattenCol((1, 1, 0)) >> Collect()
[(3, 3, 1), (4, 4, 2), (7, 7, 5), (8, 8, 6)]
>>> data = [([1, 2], 3), (6, [7, 8])]
>>> data >> FlattenCol((0, 1)) >> Collect()
[(1, 3), (2, 3), (6, 7), (6, 8)]
Parameters:iterable (iterable) – Any iterable.
Params int|tuple columns:
 Column index or indices
Returns:Flattened columns of iterable
Return type:generator
__rrshift__(iterable)
class GroupBy(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> GroupBy(keycol=lambda x: x, nokey=False)

Group elements of iterable based on a column value of the element or the function value of keycol for the element. Note that elements of iterable do not need to be sorted. GroupBy will store all elements in memory! If the iterable is sorted use GroupBySorted() instead. see also Chunk(), ChunkWhen(), ChunkBy()

>>> from nutsflow import Sort
>>> [1, 2, 1, 1, 3] >> GroupBy() >> Sort()
[(1, [1, 1, 1]), (2, [2]), (3, [3])]
>>> [1, 2, 1, 1, 3] >> GroupBy(nokey=True) >> Sort()
[[1, 1, 1], [2], [3]]
>>> ['--', '+++', '**'] >> GroupBy(len) >> Sort()
[(2, ['--', '**']), (3, ['+++'])]
>>> ['a3', 'b2', 'c1'] >> GroupBy(1) >> Sort()
 [('1', ['c1']), ('2', ['b2']), ('3', ['a3'])]
>>> [(1,3), (2,2), (3,1)] >> GroupBy(1, nokey=True) >> Sort()
[[(1, 3)], [(2, 2)], [(3, 1)]]
Parameters:
  • iterable (iterable) – Any iterable
  • keycol (int|function) – Column index or key function.
  • nokey (bool) – True: results will not contain keys for groups, only the groups themselves.
Returns:

Iterator over groups.

Return type:

iterator

__rrshift__(iterable)
class GroupBySorted(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> GroupBySorted(prob, keycol=lambda x: x, nokey=False)

Group elements of iterable based on a column value of the element or the function value of key_or_col for the element. Iterable needs to be sorted according to keycol! See https://docs.python.org/2/library/itertools.html#itertools.groupby If iterable is not sorted use GroupBy but be aware that it stores all elements of the iterable in memory! see also Chunk(), ChunkWhen(), ChunkBy()

>>> from nutsflow import Collect, nut_sink
>>> @nut_sink
... def ViewResult(iterable):
...     return iterable >> Map(lambda t: (t[0], list(t[1]))) >> Collect()
>>> [1, 1, 1, 2, 3] >> GroupBySorted() >> ViewResult()
[(1, [1, 1, 1]), (2, [2]), (3, [3])]
>>> [1, 1, 1, 2, 3] >> GroupBySorted(nokey=True) >> Map(list) >> Collect()
[[1, 1, 1], [2], [3]]
>>> ['--', '**', '+++'] >> GroupBySorted(len) >> ViewResult()
[(2, ['--', '**']), (3, ['+++'])]
Parameters:
  • iterable (iterable) – Any iterable
  • keycol (int|function) – Column index or key function.
  • nokey (bool) – True: results will not contain keys for groups, only the groups themselves.
Returns:

Iterator over groups where values are iterators.

Return type:

iterator

__rrshift__(iterable)
class If(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> If(cond, if_nut, [,else_nut])

Depending on condition cond execute if_nut or else_nut. Useful for conditional flows.

>>> from nutsflow import Square, Collect
>>> [1, 2, 3] >> If(True, Square()) >> Collect()
[1, 4, 9]
>>> [1, 2, 3] >> If(False, Square(), Take(1)) >> Collect()
[1]
Parameters:
  • iterable (iterable) – Any iterable
  • cond (bool) – Boolean conditional value.
  • if_nut (Nut) – Nut to be executed if cond == True
  • else_nut (Nut) – Nut to be executed if cond == False
Returns:

Result of if_nut or else_nut

Return type:

Any

__rrshift__(iterable)
class Insert(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Insert(index, items)

Insert item(s) into lists/tuples in iterable.

>>> [(1, 2), (3, 4)] >> Insert(1, 'X') >> Collect()
[(1, 'X', 2), (3, 'X', 4)]
>>> items = ['a', 'b']
>>> [(1, 2), (3, 4)] >> Insert(2, items) >> Collect()
[(1, 2, 'a'), (3, 4, 'b')]
>>> items = [('a', 'b'), ('c', 'd')]
>>> [(1, 2), (3, 4)] >> Insert(1, items) >> Collect()
[(1, 'a', 'b', 2), (3, 'c', 'd', 4)]
>>> from nutsflow import Enumerate
>>> [(1, 2), (3, 4)] >> Insert(0, Enumerate()) >> Collect()
[(0, 1, 2), (1, 3, 4)]
Parameters:
  • iterable iterable (iterable) – Any iterable over tuples or lists
  • index (int) – Index at which position items are inserted.
  • items (iterable|object) – A single object or an iterable over objects.
Returns:

iterator where items are inserted into the iterable elements.

Return type:

iterator over tuples

__rrshift__(iterable)
class Interleave(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Interleave(*iterables)

Interleave elements of iterable with elements of given iterables. Similar to iterable >> Zip(*iterables) >> Flatten() but longest iterable determines length of interleaved iterator.

>>> from nutsflow import Range, Collect
>>> Range(5) >> Interleave('abc') >> Collect()
[0, 'a', 1, 'b', 2, 'c', 3, 4]
>>> '12' >> Interleave('abcd', '+-') >> Collect()
['1', 'a', '+', '2', 'b', '-', 'c', 'd']
Parameters:
  • iterable (iterable) – Any iterable
  • iterables (iterable) – Iterables to interleave
Returns:

Iterator over interleaved elements.

Return type:

iterator

__rrshift__(iterable)
Map

alias of builtins._create_nut_wrapper.<locals>.Wrapper

class MapCol(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> MapCol(columns, func)

Apply given function to given columns of elements in iterable.

>>> neg = lambda x: -x
>>> [(1, 2), (3, 4)] >> MapCol(0, neg) >> Collect()
[(-1, 2), (-3, 4)]
>>> [(1, 2), (3, 4)] >> MapCol(1, neg) >> Collect()
[(1, -2), (3, -4)]
>>> [(1, 2), (3, 4)] >> MapCol((0, 1), neg) >> Collect()
[(-1, -2), (-3, -4)]
Parameters:
  • of iterables iterable (iterable) – Any iterable that contains iterables
  • of ints columns (int|tuple) – Column index or tuple of indexes
  • func (function) – Function to apply to elements
Returns:

Iterator over lists

Return type:

iterator of list

__rrshift__(iterable)
class MapMulti(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> MapMulti(*funcs)

Map multiple functions on iterable. For each function a separate iterable is returned. Can consume large amounts of memory when iterables are processed sequentially!

>>> from nutsflow import Collect, _
>>> nums, twos, greater2 = [1, 2, 3] >> MapMulti(_, _ * 2, _ > 2)
>>> nums >> Collect()
[1, 2, 3]
>>> twos >> Collect()
[2, 4, 6]
>>> greater2 >> Collect()
[False, False, True]
Parameters:
  • iterable (iterable) – Any iterable
  • funcs (functions) – Functions to map
Returns:

Iterators for each function

Return type:

(iterator, ..)

__rrshift__(iterable)
class MapPar(func, chunksize=8)[source]

Bases: nutsflow.base.Nut

__init__(func, chunksize=8)[source]

iterable >> MapPar(func, chunksize=mp.cpu_count())

Map function in parallel. Order of iterable is preserved. Note that ParMap is of limited use since ‘func’ must be pickable and only top level functions (not class methods) are pickable. See https://docs.python.org/2/library/pickle.html

>>> from nutsflow import Collect
>>> [-1, -2, -3] >> MapPar(abs) >> Collect()
[1, 2, 3]
Parameters:
  • iterable (iterable) – Any iterable
  • func (function) – Function to map
  • chunksize (int) – Number of parallel processes to use for mapping.
Returns:

Iterator over mapped elements

Return type:

iterator

__rrshift__(iterable)[source]

Chaining operator for Nuts. Needs to be overridden!

Takes an input iterable and produces some output iterable. If the number of elements in the input and the output iterable does not change consider NutFunction instead.

Parameters:iterable (iterable) – Iterable to process.
Returns:Iterable
Return type:iterable
Raise:NotImplementedError if not implemented.
Partition

alias of nutsflow.iterfunction._create_nut_wrapper.<locals>.Wrapper

Permutate

alias of itertools._create_nut_wrapper.<locals>.Wrapper

class Pick(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Pick(p_n)

Pick every p_n-th element from the iterable if p_n is an integer, otherwise pick randomly with probability p_n.

>>> from nutsflow import Range, Collect
>>> from nutsflow.common import StableRandom
>>> [1, 2, 3, 4] >> Pick(0.0) >> Collect()
[]
>>> [1, 2, 3, 4] >> Pick(1.0) >> Collect()
[1, 2, 3, 4]
>>> import random as rnd
>>> Range(10) >> Pick(0.5, StableRandom(1)) >> Collect()
[0, 4, 5, 6, 8, 9]
>>> [1, 2, 3, 4] >> Pick(2) >> Collect()
[1, 3]
Parameters:
  • iterable (iterable) – Any iterable
  • p_n (float|int) – Probability p in [0, 1] or integer n for every n-th element
  • rand (Random|None) – Random number generator. If None, random.Random() is used.
Returns:

Iterator over picked elements.

Return type:

iterator

__rrshift__(iterable)
class Prefetch(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Prefetch(num_prefetch=1)

Prefetch elements from iterable. Typically used to keep the CPU busy while the GPU is crunching.

>>> from nutsflow import Take, Consume
>>> it = iter([1, 2, 3, 4])
>>> it >> Prefetch(1) >> Take(1) >> Consume()
>>> next(it)   
3
Parameters:
  • iterable (iterable) – Any iterable
  • num_prefetch (int) – Number of elements to prefetch.
Returns:

Iterator over input elements

Return type:

iterator

__rrshift__(iterable)
class PrintProgress(data, every_sec=10.0)[source]

Bases: nutsflow.base.Nut

__init__(data, every_sec=10.0)[source]

iterable >> PrintProgress(data, every_sec=10.0)

Print progress on iterable. Requires that length of iterable is known beforehand. Data are just passed through. For long running computations and Estimated time of arrival (eta) is printed as well

range(10) >> PrintProgress(10, 0) >> Consume()

Parameters:
  • iterable (iterable) – Any iterable
  • data (int) – Number of elements in iterable or realized iterable. If data is provided it must not be an iterator since it will be consumed!
  • every_sec (float) – Progress is printed every ‘every_sec’ seconds.
Returns:

Iterator over input elements

Return type:

iterator

__rrshift__(iterable)[source]

Chaining operator for Nuts. Needs to be overridden!

Takes an input iterable and produces some output iterable. If the number of elements in the input and the output iterable does not change consider NutFunction instead.

Parameters:iterable (iterable) – Iterable to process.
Returns:Iterable
Return type:iterable
Raise:NotImplementedError if not implemented.
class Shuffle(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Shuffle(buffersize)

Perform (partial) random shuffle of the elements in the iterable. Elements of the iterable are stored in a buffer of the given size and shuffled within. If buffersize is smaller than the length of the iterable the shuffle is therefore partial in the sense that the ‘window’ of the shuffle is limited to buffersize. Note that for buffersize = 1 no shuffling occurs.

In the following example rand = StableRandom(0) is used to create a fixed sequence that stable across Python version 2.x and 3.x. Usually, this is not what you want. Use the default rand=None which uses random.Random() instead.

>>> from nutsflow import Range, Collect
>>> from nutsflow.common import StableRandom
>>> Range(10) >> Shuffle(5, StableRandom(0)) >> Collect()
[4, 2, 3, 6, 7, 0, 1, 9, 5, 8]
>>> Range(10) >> Shuffle(1, StableRandom(0)) >> Collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Parameters:
  • iterable (iterable) – Any iterable
  • buffersize (int) – Number of elements stored in shuffle buffer.
  • rand (Random|None) – Random number generator. If None, random.Random() is used.
Returns:

Generator over shuffled elements

Return type:

generator

__rrshift__(iterable)
class Slice(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Slice([start,] stop[, stride])

Return slice of elements from iterable. See https://docs.python.org/2/library/itertools.html#itertools.islice

>>> from nutsflow import Collect
>>> [1, 2, 3, 4] >> Slice(2) >> Collect()
[1, 2]
>>> [1, 2, 3, 4] >> Slice(1, 3) >> Collect()
[2, 3]
>>> [1, 2, 3, 4] >> Slice(0, 4, 2) >> Collect()
[1, 3]
Parameters:
  • iterable (iterable) – Any iterable
  • start (int) – Start index of slice.
  • stop (int) – End index of slice.
  • step (int) – Step size of slice.
Returns:

Elements sliced from iterable

Return type:

iterator

__rrshift__(iterable)
class Take(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Take(n)

Return first n elements of iterable

>>> from nutsflow import Collect
>>> [1, 2, 3, 4] >> Take(2) >> Collect()
[1, 2]
Parameters:
  • iterable (iterable) – Any iterable
  • n (int) – Number of elements to take
Returns:

First n elements of iterable

Return type:

iterator

__rrshift__(iterable)
TakeWhile

alias of itertools._create_nut_wrapper.<locals>.Wrapper

Tee

alias of itertools._create_nut_wrapper.<locals>.Wrapper

class Try(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Try(nut)

Exception handling for (nut) functions. If the wrapped nut or function raises an exception it is caught and handled with the provided handler. Per default the exception and the value causing it are printed. Furthermore a default value can be specified that is returned instead of the nut output if an exception occurs. Per default no output is returned but an error message printed (STDERR).

NOTE: In the following examples ‘STDOUT’ is used only to verify the error message within the doctest. In production code use the default value of ‘STDERR’.

>>> from nutsflow import Try, Collect, nut_function  
>>> [10, 2, 1] >> Try(lambda x : 10//x) >> Collect()
[1, 5, 10]
>>> [10, 0, 1] >> Try(lambda x : 10//x, 'STDOUT') >> Collect()
ERROR: 0 : integer division or modulo by zero
[1, 10]
>>> Div = nut_function(lambda x : 10//x)
>>> [10, 2, 1] >> Try(Div()) >> Collect()
[1, 5, 10]
>>> [10, 0, 1] >> Try(Div(), 'STDOUT') >> Collect()
ERROR: 0 : integer division or modulo by zero
[1, 10]
>>> [10, 0, 1] >> Try(Div(), -1) >> Collect()
[1, -1, 10]
>>> handlezero = lambda x, e: 'FAILED: '+str(x)
>>> [10, 0, 1] >> Try(Div(), handlezero) >> Collect()
[1, 'FAILED: 0', 10]
>>> handlezero = lambda x, e: str(e)
>>> [10, 0, 1] >> Try(Div(), handlezero) >> Collect()
[1, 'integer division or modulo by zero', 10]
Parameters:
  • iterable (iterable) – Iterable the nut operates on.
  • func (function|NutFunction) – (Nut) function that is wrapped for exception handling. Can be a plain Python function/method as well.
  • default (Object) –

    Return value if exception occurs. If default = ‘IGNORE’, no value is returned and no error is printed. If default = ‘STDERR’, no value is returned, error is printed to stderr. If default = ‘STDOUT’, no value is returned, error is printed to stdout. If default is function that takes element x and exception e

    as parameters its result is returned and no error is printed.

    Otherwise the default value is returned and no error is printed.

Returns:

Iterator over input elements transformed by provided nut.

Return type:

iterator

__rrshift__(iterable)
class Window(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Window(n)

Sliding window of size n over elements in iterable.

>>> [1, 2, 3, 4] >> Window() >> Collect()
[(1, 2), (2, 3), (3, 4)]
>>> [1, 2, 3, 4] >> Window(3) >> Collect()
[(1, 2, 3), (2, 3, 4)]
>>> 'test' >> Window(2) >> Map(''.join) >> Collect()
['te', 'es', 'st']
Parameters:
  • iterable (iterable) – Any iterable
  • n (int) – Size of window
Returns:

iterator with tuples of length n

Return type:

iterator over tuples

__rrshift__(iterable)
class Zip(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> Zip(*iterables)

Zip elements of iterable with elements of given iterables. Zip finishes when shortest iterable is exhausted. See https://docs.python.org/2/library/itertools.html#itertools.izip And https://docs.python.org/2/library/itertools.html#itertools.izip_longest

>>> from nutsflow import Collect
>>> [0, 1, 2] >> Zip('abc') >> Collect()
[(0, 'a'), (1, 'b'), (2, 'c')]
>>> '12' >> Zip('abcd', '+-') >> Collect()
[('1', 'a', '+'), ('2', 'b', '-')]
Parameters:
  • iterable (iterable) – Any iterable
  • iterables (iterable) – Iterables to zip
Returns:

Zipped elements from iterables.

Return type:

iterator over tuples

__rrshift__(iterable)
class ZipWith(*args, **kwargs)[source]

Bases: nutsflow.base.Nut

iterable >> ZipWith(f, *iterables)

Zips the given iterables, unpacks them and applies the given function.

>>> add = lambda a, b: a + b
>>> [1, 2, 3] >> ZipWith(add, [2, 3, 4]) >> Collect()
[3, 5, 7]
Parameters:
  • iterable (iterable) – Any iterable
  • iterables (iterable) – Any iterables
  • f (function) – Function to apply to zipped input iterables
Returns:

iterator of result of f() applied to zipped iterables

Return type:

iterator

__rrshift__(iterable)

nutsflow.sink module

class ArgMax(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> ArgMax(key=None, default=None, retvalue=False)

Return index of first maximum element (and maximum) in input (transformed or extracted by key function).

>>> [1, 2, 0, 2] >> ArgMax()
1
>>> ['12', '1', '123'] >> ArgMax(key=len, retvalue=True)
(2, '123')
>>> ['12', '1', '123'] >> ArgMax(key=len)
2
>>> [] >> ArgMax(default=0)
0
>>> [] >> ArgMax(default=(None, 0), retvalue=True)
(None, 0)
>>> data = [(3, 10), (2, 20), (1, 30)]
>>> data >> ArgMax(key=0)
0
>>> data >> ArgMax(1)
2
Parameters:
  • iterable (iterable) – Iterable over numbers
  • key (int|tuple|function|None) – Key function to extract or transform elements. None = identity function.
  • default (object) – Value returned if iterable is empty.
  • retvalue (bool) – If True the index and the value of the maximum element is returned.
Returns:

index of largest element according to key function and the largest element itself if retvalue==True

Return type:

object | tuple

__rrshift__(iterable)
class ArgMin(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> ArgMin(key=None, default=None, retvalue=True)

Return index of first minimum element (and minimum) in input (transformed or extracted by key function).

>>> [1, 2, 0, 2] >> ArgMin()
2
>>> ['12', '1', '123'] >> ArgMin(key=len, retvalue=True)
(1, '1')
>>> ['12', '1', '123'] >> ArgMin(key=len)
1
>>> [] >> ArgMin(default=0)
0
>>> [] >> ArgMin(default=(None, 0), retvalue=True)
(None, 0)
>>> data = [(3, 10), (2, 20), (1, 30)]
>>> data >> ArgMin(key=0)
2
>>> data >> ArgMin(1)
0
Parameters:
  • iterable (iterable) – Iterable over numbers
  • key (int|tuple|function|None) – Key function to extract or transform elements. None = identity function.
  • default (object) – Value returned if iterable is empty.
  • retvalue (bool) – If True the index and the value of the minimum element is returned.
Returns:

index of smallest element according to key function and the smallest element itself if retvalue==True.

Return type:

object | tuple

__rrshift__(iterable)
class Collect(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Collect(container)

Collects all elements of the iterable input in the given container.

>>> range(5) >> Collect()
[0, 1, 2, 3, 4]
>>> [1, 2, 3, 2] >> Collect(set)  
{1, 2, 3}
>>> [('one', 1), ('two', 2)] >> Collect(dict)  
{'one': 1, 'two': 2}
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • container (container) – Some container, e.g. list, set, dict that can be filled from an iterable
Returns:

Container

Return type:

container

__rrshift__(iterable)
Consume

alias of nutsflow.iterfunction._create_nut_wrapper.<locals>.Wrapper

Count

alias of nutsflow.iterfunction._create_nut_wrapper.<locals>.Wrapper

class CountValues(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> CountValues(relative=False)

Return dictionary with (relative) counts of the values in the input iterable.

>>> 'abaacc' >> CountValues()  
{'a': 3, 'b': 1, 'c': 2}
>>> 'aabaab' >> CountValues(relative=True)  
{'a': 1.0, 'b': 0.5}
>>> data = [('a', 'X'), ('b', 'Y'), ('a', 'Y')]
>>> data >> CountValues(column=0)  
{'a': 2, 'b': 1}
>>> data >> CountValues(column=1)  
{'Y': 2, 'X': 1}
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • column (int|None) – Column of values in iterable to extract values from. If colum=None the values in the iterable themselves will be counted.
  • relative (bool) – True: return relative counts otherwise absolute counts
Returns:

Dictionary with (relative) counts for elements in iterable.

Return type:

dict

__rrshift__(iterable)
class Head(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Head(n, container=list)

Collect first n elements of iterable in specified container.

>>> [1, 2, 3, 4] >> Head(2)
[1, 2]
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • n (int) – Number of elements to take.
  • container (container) – Container to collect elements in, e.g. list, set
Returns:

Container with head elements

Return type:

container

__rrshift__(iterable)
class Join(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Join(separator=’‘)

Same as Python’s sep.join(iterable). Concatenates the elements in the iterable to a string using the given separator. In addition to Python’s sep.join(iterable) it also automatically converts elements to strings.

Parameters:
  • iterable (iterable) – Any iterable
  • separator (string) – Seperator string between elements.
Returns:

String of with concatenated elements of iterable.

Return type:

str

__rrshift__(iterable)
class Max(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Max(key=None, default=None)

Return maximum of inputs (transformed or extracted by key function).

>>> [1, 2, 3, 2] >> Max()
3
>>> ['1', '123', '12'] >> Max(key=len)
'123'
>>> [] >> Max(default=0)
0
>>> data = [(3, 10), (2, 20), (1, 30)]
>>> data >> Max(key=0)
(3, 10)
>>> data >> Max(1)
(1, 30)
Parameters:
  • iterable (iterable) – Iterable over numbers
  • key (int|tuple|function|None) – Key function to extract or transform elements. None = identity function.
  • default (object) – Value returned if iterable is empty.
Returns:

largest element according to key function

Return type:

object

__rrshift__(iterable)
class Mean(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Mean(key=None, default=None)

Return mean value of inputs (transformed or extracted by key function).

>>> [1, 2, 3] >> Mean()
2.0
>>> [] >> Mean(default=0)
0
>>> data = [(1, 10), (2, 20), (3, 30)]
>>> data >> Mean(key=0)
2.0
>>> data >> Mean(key=1)
20.0
Parameters:
  • iterable (iterable) – Iterable over numbers
  • default (object) – Value returned if iterable is empty.
  • key (int|tuple|function|None) – Key function to extract elements.
Returns:

Mean of numbers or default value

Return type:

number

__rrshift__(iterable)
class MeanStd(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> MeanStd(key=None, default=None, ddof=1)

Return mean and standard deviation of inputs (transformed or extracted by key function). Standard deviation is with degrees of freedom = 1

>>> [1, 2, 3] >> MeanStd()
(2.0, 1.0)
>>> data = [(1, 10), (2, 20), (3, 30)]
>>> data >> MeanStd(key=0)
(2.0, 1.0)
>>> data >> MeanStd(1)
(20.0, 10.0)
Parameters:
  • iterable (iterable) – Iterable over numbers
  • default (object) – Value returned if iterable is empty.
  • key (int|tuple|function|None) – Key function to extract elements.
  • ddof (int) – Delta degrees of freedom (should 0 or 1)
Returns:

Mean and standard deviation of numbers or default value

Return type:

tuple (mean, std)

__rrshift__(iterable)
class Min(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Min(key=None, default=None)

Return minimum of inputs (transformed or extracted by key function).

>>> [1, 2, 3, 2] >> Min()
1
>>> ['1', '123', '12'] >> Min(key=len)
'1'
>>> [] >> Min(default=0)
0
>>> data = [(3, 10), (2, 20), (1, 30)]
>>> data >> Min(key=0)
(1, 30)
>>> data >> Min(1)
(3, 10)
Parameters:
  • iterable (iterable) – Iterable over numbers
  • key (int|tuple|function|None) – Key function to extract or transform elements. None = identity function.
  • default (object) – Value returned if iterable is empty.
Returns:

smallest element according to key function

Return type:

object

__rrshift__(iterable)
Next

alias of builtins._create_nut_wrapper.<locals>.Wrapper

Nth

alias of nutsflow.iterfunction._create_nut_wrapper.<locals>.Wrapper

Reduce

alias of _functools._create_nut_wrapper.<locals>.Wrapper

class Sort(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Sort(key=None, reverse=False)

Sorts iterable with respect to key function or column index(es).

>>> [3, 1, 2] >> Sort()
[1, 2, 3]
>>> [3, 1, 2] >> Sort(reverse=True)
[3, 2, 1]
>>> [(1,'c'), (2,'b'), (3,'a')] >> Sort(1)
[(3, 'a'), (2, 'b'), (1, 'c')]
>>> ['a3', 'c1', 'b2'] >> Sort(key=lambda s: s[0])
['a3', 'b2', 'c1']
>>> ['a3', 'c1', 'b2'] >> Sort(key=0)
['a3', 'b2', 'c1']
>>> ['a3', 'c1', 'b2'] >> Sort(1)
['c1', 'b2', 'a3']
>>> ['a3', 'c1', 'b2'] >> Sort((1,0))
['c1', 'b2', 'a3']
Parameters:
  • iterable (iterable) – Iterable
  • key (int|tuple|function|None) – function to sort based on or column index(es) tuples/vectors/strings are sorted by.
  • reverse (boolean) – True: reverse order.
Returns:

Sorted iterable

Return type:

list

__rrshift__(iterable)
class Sum(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Sum(key=None)

Return sum over inputs (transformed or extracted by key function)

>>> [1, 2, 3] >> Sum()
6
>>> [1, 2, 3] >> Sum(lambda x: x*x)
14
>>> data = [(1, 10), (2, 20), (3, 30)]
>>> data >> Sum(key=0)
6
>>> data >> Sum(key=1)
60
Parameters:
  • iterable (iterable) – Iterable over numbers
  • key (int|tuple|function|None) – Key function to extract elements.
Returns:

Sum of numbers

Return type:

number

__rrshift__(iterable)
class Tail(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Tail(n, container=list)

Collect last n elements of iterable in specified container. This consumes the iterable completely!

>>> [1, 2, 3, 4] >> Tail(2)
[3, 4]
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • n (int) – Number of elements to take.
  • container (container) – Container to collect elements in, e.g. list, set
Returns:

Container with tail elements

Return type:

container

__rrshift__(iterable)
class Unzip(*args, **kwargs)[source]

Bases: nutsflow.base.NutSink

iterable >> Unzip(container=None)

Same as izip(*iterable) but returns iterators for container=None

>>> [(1, 2, 3), (4, 5, 6)] >> Unzip(tuple) >> Collect()
[(1, 4), (2, 5), (3, 6)]
Parameters:
  • iterable (iterable) – Any iterable, e.g. list, range, …
  • container (container) – If not none, unzipped results are collected in the provided container, eg. list, tuple, set
Returns:

Unzip iterable.

Return type:

iterator over iterators

__rrshift__(iterable)
class WriteCSV(filepath, cols=None, skipheader=0, fmtfunc=<function WriteCSV.<lambda>>, **kwargs)[source]

Bases: nutsflow.base.NutSink

Write data to a CSV file using Python’s CSV writer. See: https://docs.python.org/2/library/csv.html

__init__(filepath, cols=None, skipheader=0, fmtfunc=<function WriteCSV.<lambda>>, **kwargs)[source]

WriteCSV(filepath, cols, skipheader, fmtfunc, **kwargs)

Write data in Comma Separated Values format (CSV) and other formats to file. Tab Separated Values (TSV) files can be written by specifying a different delimiter. Note that in the docstring below delimiter is ‘t’ but in code it should be ‘ ‘. See unit tests.

Also see https://docs.python.org/2/library/csv.html and ReadCSV.

>>> import os
>>> filepath = 'tests/data/temp_out.csv'
>>> with WriteCSV(filepath) as writer:
...     range(10) >> writer
>>> os.remove(filepath)
>>> with WriteCSV(filepath, cols=(1,0)) as writer:
...     [(1,2), (3,4)] >> writer
>>> os.remove(filepath)
>>> filepath = 'tests/data/temp_out.tsv'
>>> with WriteCSV(filepath, delimiter='\t') as writer:
...     [[1,2], [3,4]] >> writer
>>> os.remove(filepath)
Parameters:
  • filepath (string) – Path to file in CSV format.
  • cols (tuple) – Indices of the columns to write. If None all columns are written.
  • skipheader (int) – Number of header rows to skip.
  • fmtfunc (function) – Function to apply to the elements of each row.
  • kwargs (kwargs) – Keyword arguments for Python’s CSV writer. See https://docs.python.org/2/library/csv.html
__rrshift__(iterable)[source]

Write elements of iterable to file

close()[source]

Close writer

nutsflow.source module

class Empty[source]

Bases: nutsflow.base.NutSource

Return empty iterable.

>>> from nutsflow import Collect
>>> Empty() >> Collect()
[]
Returns:Empty iterator
Return type:iterator
class Enumerate(start=0[, step])[source]

Bases: nutsflow.base.NutSource

Return increasing integers. See itertools.count

>>> from nutsflow import Take, Collect
>>> Enumerate() >> Take(3) >> Collect()
[0, 1, 2]
>>> Enumerate(1, 2) >> Take(3) >> Collect()
[1, 3, 5]
Parameters:
  • start (int) – Start of integer sequence
  • step (int) – Step of sequence
Returns:

Increasing integers.

Return type:

iterable over int

class Product(*iterables[, repeat])[source]

Bases: nutsflow.base.NutSource

Return cartesian product of input iterables.

>>> from nutsflow import Collect
>>> Product([1, 2], [3, 4]) >> Collect()
[(1, 3), (1, 4), (2, 3), (2, 4)]
>>> Product('ab', range(3)) >> Collect()
[('a', 0), ('a', 1), ('a', 2), ('b', 0), ('b', 1), ('b', 2)]
>>> Product([1, 2, 3], repeat=2) >> Collect()
[(1, 1), (1, 2), (1, 3), (2, 1), (2, 2), (2, 3), (3, 1), (3, 2), (3, 3)]
Parameters:
  • iterables (iterables) – Collections of iterables to create cartesian product from.
  • repeat (int) – Repeat a single iterable ‘repeat’ times, e.g. Procuct([1,2], [1,2]) is equal to Product([1,2], repeat=2)
Returns:

cartesian product

Return type:

iterator over tuples

class Range(*args, **kwargs)[source]

Bases: nutsflow.base.NutSource

Range of numbers. Similar to range() but returns iterator that depletes.

__init__(*args, **kwargs)[source]

Range(start [,end [, step]])

Return range of integers.

>>> from nutsflow import Collect
>>> Range(4) >> Collect()
[0, 1, 2, 3]
>>> Range(1, 5) >> Collect()
[1, 2, 3, 4]
Parameters:
  • start (int) – Start of range.
  • end (int) – End of range. Not inclusive. Optional.
  • step (int) – Step size. Optional.
Returns:

Range of integers.

Return type:

iterable over int

class ReadCSV(filepath, columns=None, skipheader=0, fmtfunc=None, **kwargs)[source]

Bases: nutsflow.base.NutSource

Read data from a CSV file using Python’s CSV reader. See: https://docs.python.org/2/library/csv.html

__init__(filepath, columns=None, skipheader=0, fmtfunc=None, **kwargs)[source]

ReadCSV(filepath, columns, skipheader, fmtfunc, **kwargs)

Read data in Comma Separated Format (CSV) from file. See also CSVWriter. Can also read Tab Separated Format (TSV) be providing the corresponding delimiter. Note that in the docstring below delimiter is ‘t’ but in code it should be ‘ ‘. See unit tests.

>>> from nutsflow import Collect
>>> filepath = 'tests/data/data.csv'
>>> with ReadCSV(filepath, skipheader=1) as reader:
...     reader >> Collect()
[('1', '2', '3'), ('4', '5', '6')]
>>> with ReadCSV(filepath, skipheader=1, fmtfunc=int) as reader:
...     reader >> Collect()
[(1, 2, 3), (4, 5, 6)]
>>> with ReadCSV(filepath, skipheader=1, fmtfunc=(int,str,float)) as reader:
...     reader >> Collect()
[(1, '2', 3.0), (4, '5', 6.0)]
>>> with ReadCSV(filepath, (2, 1), 1, int) as reader:
...     reader >> Collect()
[(3, 2), (6, 5)]
>>> with ReadCSV(filepath, (2, 1), 1, (str,int)) as reader:
...     reader >> Collect()
[('3', 2), ('6', 5)]
>>> with ReadCSV(filepath, 2, 1, int) as reader:
...     reader >> Collect()
[3, 6]
>>> filepath = 'tests/data/data.tsv'
>>> with ReadCSV(filepath, skipheader=1, fmtfunc=int,
...                delimiter='\t') as reader:
...     reader >> Collect()
[(1, 2, 3), (4, 5, 6)]
Parameters:
  • filepath (string) – Path to file in CSV format.
  • columns (tuple) – Indices of the columns to read. If None all columns are read.
  • skipheader (int) – Number of header lines to skip.
  • fmtfunc (tuple|function) – Function or functions to apply to the column elements of each row.
  • kwargs (kwargs) – Keyword arguments for Python’s CSV reader. See https://docs.python.org/2/library/csv.html
close()[source]

Close reader

class Repeat(value[, n])[source]

Bases: nutsflow.base.NutSource

Return given value repeatedly. See itertools.repeat

>>> from nutsflow import Take, Collect
>>> Repeat(1, 3) >> Collect()
[1, 1, 1]
>>> Repeat(1) >> Take(4) >> Collect()
[1, 1, 1, 1]
Parameters:
  • value (object) – Value to repeat
  • times (int) – Optional parameter. Object is repeated ‘n’ times.
Returns:

Iterator of repeated objects

Return type:

iterable over object

nutsflow.underscore module

Module contents