Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_standalone_report_stream_invalid(self):
filepath = os.path.join(self.data_dir, 'valid.csv')
report_stream = io.BufferedReader(io.BytesIO())
self.assertRaises(exceptions.ProcessorBuildError,
processors.StructureProcessor,
report_stream=report_stream)
except csv.Error:
dialect = csv.excel
def iterenc_utf8(data):
for line in data:
yield line.encode('utf-8')
iter = chain(first_lines, data)
iter = iterenc_utf8(iter)
csv.field_size_limit(20000000)
try:
reader = csv.reader(iter, dialect=dialect, **kwargs)
for row in reader:
yield [str(cell, 'utf-8') for cell in row]
except TypeError as e:
raise exceptions.DataSourceMalformatedError(msg=e.args[0], file_format='csv')
def register_check(self, func, name, type=None, context=None, position=None):
check = {
'func': func,
'name': name,
'type': type,
'context': context,
}
# Validate check
error = spec['errors'].get(name)
if error:
if (check['type'] is not None or check['context'] is not None):
message = 'Check "%s" is a part of the spec but type/context is incorrect'
raise exceptions.GoodtablesException(message % name)
check['type'] = error['type']
check['context'] = error['context']
elif not error and type != 'custom':
message = 'Check "%s" is not a part of the spec should have type "custom"'
raise exceptions.GoodtablesException(message % name)
# Validate position
if position:
try:
position = position.split(':', 1)
assert position[0] in ['before', 'after']
assert self.__checks.get(position[1])
except (TypeError, AssertionError):
message = 'Check "%s" has been registered at invalid position "%s"'
raise exceptions.GoodtablesException(message % (name, position))
def _stream_from_url(self, url):
"""Return a seekable and readable stream from a URL."""
stream = io.BufferedRandom(io.BytesIO())
valid_url = helpers.make_valid_url(url)
try:
document = compat.urlopen(valid_url)
except compat.HTTPError as e:
raise exceptions.DataSourceHTTPError(status=e.getcode())
stream.write(document.read())
stream.seek(0)
return stream
def __init__(self, column, average='mean', interval=3, **options):
# Set attributes
self.__column = column
self.__interval = interval
self.__column_cells = []
self.__average_function = _AVERAGE_FUNCTIONS.get(average)
self.__code = 'deviated-value'
self.__cell = None
# Validate average
if not self.__average_function:
message = 'Deviated value check supports only this average functions: %s'
message = message % ', '.join(_AVERAGE_FUNCTIONS.keys())
raise exceptions.GoodtablesError(message)
stream = codecs.iterdecode([stream], encoding, self.decode_strategy)
elif isinstance(stream, compat.str):
_stream = io.StringIO()
_stream.write(stream)
stream = _stream
stream.seek(0)
else:
stream = codecs.iterdecode(stream, encoding, self.decode_strategy)
try:
for line in stream:
recoded = line.encode(self.DEFAULT_ENCODING).decode(self.DEFAULT_ENCODING)
textstream.write(recoded)
except UnicodeDecodeError as e:
raise exceptions.DataSourceDecodeError
textstream.seek(0)
return textstream
(not including self if handler is a method)
"""
if handler is None:
return True
else:
if not callable(handler):
raise exceptions.InvalidHandlerError
if inspect.ismethod(handler):
# extra arg for self
argcount += 1
spec = inspect.getargspec(handler)
if not len(spec[0]) == argcount:
raise exceptions.InvalidHandlerError