Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_invalid(self):
for json in INVALID_JSONS:
# Yajl1 doesn't complain about additional data after the end
# of a parsed object. Skipping this test.
if self.__class__.__name__ == 'YajlParse' and json == YAJL1_PASSING_INVALID:
continue
with self.assertRaises(common.JSONError) as cm:
list(self.backend.basic_parse(BytesIO(json)))
'''
Iterator yielding unprefixed events.
Parameters:
- file: a readable file-like object with JSON input
'''
lexer = iter(Lexer(file, buf_size))
for value in parse_value(lexer):
yield value
try:
next(lexer)
except StopIteration:
pass
else:
raise common.JSONError('Additional data')
callbacks = Callbacks(*[callback(*data) for data in _callback_data])
config = Config(allow_comments, check_utf8)
handle = yajl.yajl_alloc(byref(callbacks), byref(config), None, None)
try:
while True:
buffer = f.read(buf_size)
if buffer:
result = yajl.yajl_parse(handle, buffer, len(buffer))
else:
result = yajl.yajl_parse_complete(handle)
if result == YAJL_ERROR:
perror = yajl.yajl_get_error(handle, 1, buffer, len(buffer))
error = cast(perror, c_char_p).value
yajl.yajl_free_error(handle, perror)
exception = common.IncompleteJSONError if result == YAJL_INSUFFICIENT_DATA else common.JSONError
raise common.JSONError(error)
if not buffer and not events:
if result == YAJL_INSUFFICIENT_DATA:
raise common.IncompleteJSONError('Incomplete JSON data')
break
for event in events:
yield event
events = []
finally:
yajl.yajl_free(handle)
callbacks = Callbacks(*[callback(*data) for data in _callback_data])
config = Config(allow_comments, check_utf8)
handle = yajl.yajl_alloc(byref(callbacks), byref(config), None, None)
try:
while True:
buffer = f.read(buf_size)
if buffer:
result = yajl.yajl_parse(handle, buffer, len(buffer))
else:
result = yajl.yajl_parse_complete(handle)
if result != YAJL_OK:
perror = yajl.yajl_get_error(handle, 1, buffer, len(buffer))
error = cast(perror, c_char_p).value
yajl.yajl_free_error(handle, perror)
raise common.JSONError(error.decode('utf-8'))
if not buffer and not events:
break
for event in events:
yield event
events = []
finally:
yajl.yajl_free(handle)
with transaction.atomic():
prev_probe_exists = Probe.objects.filter(id=prev_probe_id).exists()
if prev_probe_exists:
probe.previous_id = prev_probe_id
if audit_id:
probe.audit_id = audit_id
# Build schema path, load schema json, and create validator
schema_object = get_schema_object(json_schema_name)
if schema_object:
validator = Draft4Validator(schema_object)
if json_object and validator:
try:
is_valid = validator.is_valid(json_object)
except (JSONError, IncompleteJSONError) as e:
logger.exception(e)
returnval.add_error(e)
if not is_valid:
# Save up to SCHEMA_ERROR_LIMIT errors from schema validation
error_iter = islice(validator.iter_errors(json_object), SCHEMA_ERROR_LIMIT)
for e in error_iter:
returnval.add_error(e)
if probe:
# Record results of validation into probe
probe.result['object_position'] = taskarg.get('object_position', None)
probe.result['object_identifier'] = json_object.get('identifier', None)
probe.result['object_info'] = {key: json_object.get(key, None) for key in DATASET_DESCRIPTIVE_KEYS}
probe.result['is_valid_schema_instance'] = is_valid
# Record errors and save probe
with transaction.atomic():
probe.errors.extend(returnval.errors)
'''
Iterator yielding unprefixed events.
Parameters:
- response: a stream response from requests
'''
lexer = iter(IncrementalJsonParser.lexer(response, buf_size))
for value in ijson.backend.parse_value(lexer):
yield value
try:
next(lexer)
except StopIteration:
pass
else:
raise ijson.common.JSONError('Additional data')