Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def itersplitdown(table, field, pattern, maxsplit, flags):
prog = re.compile(pattern, flags)
it = iter(table)
hdr = next(it)
flds = list(map(text_type, hdr))
if isinstance(field, int) and field < len(hdr):
field_index = field
field = hdr[field_index]
elif field in flds:
field_index = flds.index(field)
else:
raise ArgumentError('field invalid: must be either field name or index')
yield tuple(hdr)
for row in it:
value = row[field_index]
for v in prog.split(value, maxsplit):
yield tuple(v if i == field_index else row[i] for i in range(len(hdr)))
def open(self, mode='r'):
if not mode.startswith('r'):
raise ArgumentError('source is read-only')
f = urlopen(*self.args, **self.kwargs)
try:
yield f
finally:
f.close()
import whoosh.qparser
if not search_kwargs:
search_kwargs = dict()
if isinstance(index_or_dirname, string_types):
dirname = index_or_dirname
index = whoosh.index.open_dir(dirname,
indexname=indexname,
readonly=True)
needs_closing = True
elif isinstance(index_or_dirname, whoosh.index.Index):
index = index_or_dirname
needs_closing = False
else:
raise ArgumentError('expected string or index, found %r'
% index_or_dirname)
try:
# figure out header
hdr = tuple()
if docnum_field is not None:
hdr += (docnum_field,)
if score_field is not None:
hdr += (score_field,)
stored_names = tuple(index.schema.stored_names())
hdr += stored_names
yield hdr
# parse the query
if isinstance(query, string_types):
# normalise aggregators
for outfld in aggregation:
agg = aggregation[outfld]
if callable(agg):
aggregation[outfld] = None, agg
elif isinstance(agg, string_types):
aggregation[outfld] = agg, list # list is default
elif len(agg) == 1 and isinstance(agg[0], string_types):
aggregation[outfld] = agg[0], list # list is default
elif len(agg) == 1 and callable(agg[0]):
aggregation[outfld] = None, agg[0] # aggregate whole rows
elif len(agg) == 2:
pass # no need to normalise
else:
raise ArgumentError('invalid aggregation: %r, %r' % (outfld, agg))
# determine output header
if isinstance(key, (list, tuple)):
outhdr = list(key)
elif callable(key):
outhdr = ['key']
else:
outhdr = [key]
for outfld in aggregation:
outhdr.append(outfld)
yield tuple(outhdr)
# generate data
for k, rows in rowgroupby(it, key):
rows = list(rows) # may need to iterate over these more than once
# handle compound key
hdr += stored_names
yield hdr
# parse the query
if isinstance(query, string_types):
# search all fields by default
parser = whoosh.qparser.MultifieldParser(
index.schema.names(),
index.schema,
fieldboosts=fieldboosts
)
query = parser.parse(query)
elif isinstance(query, whoosh.query.Query):
pass
else:
raise ArgumentError(
'expected string or whoosh.query.Query, found %r' % query
)
# make a function to turn docs into tuples
astuple = operator.itemgetter(*index.schema.stored_names())
with index.searcher() as searcher:
if limit is not None:
results = searcher.search(query, limit=limit,
**search_kwargs)
else:
results = searcher.search_page(query, pagenum,
pagelen=pagelen,
**search_kwargs)
if docnum_field is None and score_field is None:
# allow for polymorphic args
if isinstance(source, string_types):
# assume source is the name of an HDF5 file, try to open it
h5file = tables.open_file(source, mode=mode)
needs_closing = True
elif isinstance(source, tables.File):
# source is an HDF5 file object
h5file = source
else:
# invalid source
raise ArgumentError('invalid source argument, expected file name or '
'tables.File object, found: %r' % source)
try:
yield h5file
finally:
if needs_closing:
h5file.close()
# does it quack like an SQLAlchemy connection?
elif _is_sqlalchemy_connection(dbo):
debug('assuming %r instance of sqlalchemy.engine.base.Connection', dbo)
_todb_sqlalchemy_connection(table, dbo, tablename, schema=schema,
commit=commit, truncate=truncate)
elif callable(dbo):
debug('assuming %r is a function returning standard DB-API 2.0 cursor '
'objects', dbo)
_todb_dbapi_mkcurs(table, dbo, tablename, schema=schema, commit=commit,
truncate=truncate)
# some other sort of duck...
else:
raise ArgumentError('unsupported database object type: %r' % dbo)
def keys_from_args(left, right, key, lkey, rkey):
if key is lkey is rkey is None:
# no keys specified, attempt natural join
lkey = rkey = natural_key(left, right)
elif key is not None and lkey is rkey is None:
# common key specified
lkey = rkey = key
elif key is None and lkey is not None and rkey is not None:
# left and right keys specified
pass
else:
raise ArgumentError(
'bad key arguments: either specify key, or specify both lkey and '
'rkey, or provide no key/lkey/rkey arguments at all (natural join)'
)
return lkey, rkey
def _get_td_css(h, v, td_styles):
# check for user-provided style
if td_styles:
if isinstance(td_styles, string_types):
return td_styles
elif callable(td_styles):
return td_styles(v)
elif isinstance(td_styles, dict):
if h in td_styles:
s = td_styles[h]
if isinstance(s, string_types):
return s
elif callable(s):
return s(v)
else:
raise ArgumentError('expected string or callable, got %r'
% s)
else:
raise ArgumentError('expected string, callable or dict, got %r'
% td_styles)
# fall back to default style
if isinstance(v, numeric_types) and not isinstance(v, bool):
return 'text-align: right'
else:
return ''