Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __iter__(self):
it = iter(self.table)
hdr = next(it)
outhdr = tuple((text_type(self.prefix) + text_type(f)) for f in hdr)
yield outhdr
for row in it:
yield row
def itercrossjoin(sources, prefix):
# construct fields
outhdr = list()
for i, s in enumerate(sources):
if prefix:
# use one-based numbering
outhdr.extend([text_type(i+1) + '_' + text_type(f) for f in header(s)])
else:
outhdr.extend(header(s))
yield tuple(outhdr)
datasrcs = [data(src) for src in sources]
for prod in itertools.product(*datasrcs):
outrow = list()
for row in prod:
outrow.extend(row)
yield tuple(outrow)
def __iter__(self):
nr = self.numrows
seed = self.seed
fields = self.fields.copy()
# N.B., we want this to be stable, i.e., same data each time
random.seed(seed)
# construct header row
hdr = tuple(text_type(f) for f in fields.keys())
yield hdr
# construct data rows
for _ in xrange(nr):
# artificial delay
if self.wait:
time.sleep(self.wait)
yield tuple(fields[f]() for f in fields)
elif isinstance(field, int) and field < len(flds):
field_index = field
field = flds[field_index]
else:
raise ArgumentError('field invalid: must be either field name or index')
# determine output fields
outhdr = list(flds)
if not include_original:
outhdr.remove(field)
if isinstance(newfields, (list, tuple)):
outhdr.extend(newfields)
nunpack = len(newfields)
elif isinstance(newfields, int):
nunpack = newfields
newfields = [text_type(field) + text_type(i+1) for i in range(newfields)]
outhdr.extend(newfields)
elif newfields is None:
nunpack = 0
else:
raise ArgumentError('newfields argument must be list or tuple of field '
'names, or int (number of values to unpack)')
yield tuple(outhdr)
# construct the output data
for row in it:
value = row[field_index]
if include_original:
out_row = list(row)
else:
out_row = [v for i, v in enumerate(row) if i != field_index]
nvals = len(value)
petl = sys.modules['petl']
thismodule = sys.modules[__name__]
cachesize = 100
representation = petl.look
# set True to display field indices
repr_index_header = False
# set to str or repr for different behaviour
repr_html_value = text_type
# default limit for html table representation
repr_html_limit = 5
def repr_html(tbl, limit=None, index_header=None, representation=text_type,
caption=None, encoding='utf-8'):
# add column indices to header?
if index_header is None:
index_header = repr_index_header # use default
if index_header:
indexed_header = [u'%s|%s' % (i, f)
for (i, f) in enumerate(petl.util.header(tbl))]
target = petl.transform.setheader(tbl, indexed_header)
def iterproblems(table, constraints, expected_header):
outhdr = ('name', 'row', 'field', 'value', 'error')
yield outhdr
it = iter(table)
actual_header = next(it)
if expected_header is None:
flds = list(map(text_type, actual_header))
else:
expected_flds = list(map(text_type, expected_header))
actual_flds = list(map(text_type, actual_header))
try:
assert expected_flds == actual_flds
except Exception as e:
yield ('__header__', 0, None, None, type(e).__name__)
flds = expected_flds
local_constraints = normalize_constraints(constraints, flds)
# setup getters
for constraint in local_constraints:
if 'getter' not in constraint:
if 'field' in constraint:
# should ensure FieldSelectionError if bad field in constraint
def __iter__(self):
it = iter(self.table)
hdr = next(it)
outhdr = tuple((text_type(f) + text_type(self.suffix)) for f in hdr)
yield outhdr
for row in it:
yield row
def __init__(self, table, source=None, encoding=None, errors='strict',
caption=None, vrepr=text_type, lineterminator='\n',
index_header=False, tr_style=None, td_styles=None,
truncate=None):
self.table = table
self.source = source
self.encoding = encoding
self.errors = errors
self.caption = caption
self.vrepr = vrepr
self.lineterminator = lineterminator
self.index_header = index_header
self.tr_style = tr_style
self.td_styles = td_styles
self.truncate = truncate
test = lambda r: prog.search(text_type(r[index]))
else:
# determine indices of non-key fields in the right table
# (in the output, we only include key fields from the left table - we
# don't want to duplicate fields)
rvind = [i for i in range(len(rhdr)) if i not in rkind]
rgetv = rowgetter(*rvind)
# determine the output fields
if lprefix is None:
outhdr = list(lhdr)
else:
outhdr = [(text_type(lprefix) + text_type(f)) for f in lhdr]
if rprefix is None:
outhdr.extend(rgetv(rhdr))
else:
outhdr.extend([(text_type(rprefix) + text_type(f)) for f in rgetv(rhdr)])
yield tuple(outhdr)
# define a function to join two groups of rows
def joinrows(_lrowgrp, _rrowgrp):
if _rrowgrp is None:
for lrow in _lrowgrp:
outrow = list(lrow) # start with the left row
# extend with missing values in place of the right row
outrow.extend([missing] * len(rvind))
yield tuple(outrow)
elif _lrowgrp is None:
for rrow in _rrowgrp:
# start with missing values in place of the left row
outrow = [missing] * len(lhdr)
# set key values
for li, ri in zip(lkind, rkind):