Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_map_item(self):
gen = flu(range(3)).map(lambda x: {'a': x}).map_item('a')
assert gen.collect() == [0, 1, 2]
def test_sort(self):
gen = flu(range(3, 0, -1)).sort()
assert gen.collect() == [1, 2, 3]
def test_min(self):
gen = flu(range(3))
assert gen.min() == 0
def test_last(self):
gen = flu(range(3))
assert gen.last() == 2
gen = flu([])
with self.assertRaises(IndexError):
gen.last()
gen = flu([])
assert gen.last(default=1) == 1
def test_drop_while(self):
gen = flu([1, 2, 3, 4, 3, 2, 1]).drop_while(lambda x: x < 4)
assert gen.collect() == [4, 3, 2, 1]
def test_unique(self):
class NoHash:
def __init__(self, letter, keyf):
self.letter = letter
self.keyf = keyf
a = NoHash('a', 1)
b = NoHash('b', 1)
c = NoHash('c', 2)
gen = flu([a, b, c]).unique()
assert gen.collect() == [a, b, c]
gen = flu([a, b, c]).unique(lambda x: x.letter)
assert gen.collect() == [a, b, c]
gen = flu([a, b, c]).unique(lambda x: x.keyf)
assert gen.collect() == [a, c]
def test_group_by(self):
gen = flu([1, 1, 1, 2, 2, 2, 2, 3]).zip(range(100)).group_by(lambda x: x[0])
g1, g2, g3 = gen.map(lambda x: (x[0], x[1].collect())).collect()
# Standard usage
assert g1 == (1, [(1, 0), (1, 1), (1, 2)])
assert g2 == (2, [(2, 3), (2, 4), (2, 5), (2, 6)])
assert g3 == (3, [(3, 7)])
# No param usage
v1 = flu(range(10)).group_by().map(lambda x: (x[0], list(x[1])))
v2 = flu(range(10)).map(lambda x: (x, [x]))
assert v1.collect() == v2.collect()
# Sort
gen = flu([1, 2, 1, 2]).group_by(lambda x: x, sort=False)
assert gen.count() == 4
gen = flu([1, 2, 1, 2]).group_by(lambda x: x, sort=True)
assert gen.count() == 2
# Identity Function
points = [
{'x': 1, 'y': 0},
{'x': 4, 'y': 3},
{'x': 1, 'y': 5}
]
key_func = lambda u: u['x']
gen = flu.group_by(points, key=key_func, sort=True).collect()
assert len(gen) == 2
def test_first(self):
gen = flu(range(3))
assert gen.first() == 0
gen = flu([])
with self.assertRaises(IndexError):
gen.first()
gen = flu([])
assert gen.first(default=1) == 1
def test_first(self):
gen = flu(range(3))
assert gen.first() == 0
gen = flu([])
with self.assertRaises(IndexError):
gen.first()
gen = flu([])
assert gen.first(default=1) == 1
def main():
args = parse_args(sys.argv[1:])
_command = args.command
_file = args.file
_import = getattr(args, 'import')
execute_imports(_import)
if _file:
_ = flu(read_file(_file)).map(str.rstrip)
else:
# Do not raise exception for Broken Pipe
signal(SIGPIPE, SIG_DFL)
_ = flu(sys.stdin).map(str.rstrip)
pipeline = eval(_command)
if hasattr(pipeline, "__iter__") and not isinstance(pipeline, (str, bytes)):
for r in pipeline:
sys.stdout.write(str(r) + "\n")
elif pipeline is None:
pass
else:
sys.stdout.write(str(pipeline) + "\n")