Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _chain(self, ds=None):
from ..helpers import datapackage_processor, rows_processor, row_processor, iterable_loader
for position, link in enumerate(self._preprocess_chain(), start=1):
if isinstance(link, Flow):
ds = link._chain(ds)
elif isinstance(link, DataStreamProcessor):
ds = link(ds, position=position)
elif isfunction(link):
sig = signature(link)
params = list(sig.parameters)
if len(params) == 1:
if params[0] == 'row':
ds = row_processor(link)(ds, position=position)
elif params[0] == 'rows':
ds = rows_processor(link)(ds, position=position)
elif params[0] == 'package':
ds = datapackage_processor(link)(ds, position=position)
else:
assert False, 'Failed to parse function signature %r' % params
else:
assert False, 'Failed to parse function signature %r' % params
elif isinstance(link, Iterable):
ds = iterable_loader(link)(ds, position=position)
return ds