Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __assert_raises(self, dp_fn):
with pytest.raises(ValidationError):
validation.Validator().validate(dp_fn)
def test_validate_raises_validation_error_if_invalid():
schema = {
'properties': {
'name': {},
},
'required': ['name'],
}
package = Package(schema=schema)
with pytest.raises(exceptions.ValidationError):
package.validate()
def test_init_raises_if_path_doesnt_exist():
with pytest.raises(exceptions.ValidationError):
Profile('inexistent_schema.json')
ValidationError: raises if not valid
# Returns
bool: returns True if valid
"""
# Collect errors
errors = []
for error in self._validator.iter_errors(descriptor):
if isinstance(error, jsonschema.exceptions.ValidationError):
message = str(error.message)
if six.PY2:
message = message.replace('u\'', '\'')
descriptor_path = '/'.join(map(str, error.path))
profile_path = '/'.join(map(str, error.schema_path))
error = exceptions.ValidationError(
'Descriptor validation error: %s '
'at "%s" in descriptor and '
'at "%s" in profile'
% (message, descriptor_path, profile_path))
errors.append(error)
# Raise error
if errors:
message = 'There are %s validation errors (see exception.errors)' % len(errors)
raise exceptions.ValidationError(message, errors=errors)
return True
def _check_schema(self):
try:
self._validator.check_schema(self._schema)
except jsonschema.exceptions.SchemaError as ex:
six.raise_from(
exceptions.ValidationError('Profile is invalid: %s' % ex),
ex
)
else:
req = requests.get(schema)
req.raise_for_status()
the_schema = req.json()
except (IOError, ValueError, requests.exceptions.RequestException) as ex:
message = 'Unable to load profile at "{0}"'
six.raise_from(
exceptions.ValidationError(message.format(schema)),
ex
)
elif isinstance(the_schema, dict):
the_schema = copy.deepcopy(the_schema)
else:
message = 'Schema must be a "dict", but was a "{0}"'
raise exceptions.ValidationError(message.format(type(the_schema).__name__))
return the_schema
if isinstance(schema, six.string_types):
try:
the_schema = registry.get(schema)
if not the_schema:
if os.path.isfile(schema):
with open(schema, 'r') as f:
the_schema = json.load(f)
else:
req = requests.get(schema)
req.raise_for_status()
the_schema = req.json()
except (IOError, ValueError, requests.exceptions.RequestException) as ex:
message = 'Unable to load profile at "{0}"'
six.raise_from(
exceptions.ValidationError(message.format(schema)),
ex
)
elif isinstance(the_schema, dict):
the_schema = copy.deepcopy(the_schema)
else:
message = 'Schema must be a "dict", but was a "{0}"'
raise exceptions.ValidationError(message.format(type(the_schema).__name__))
return the_schema
def __build(self):
# Process descriptor
expand = helpers.expand_package_descriptor
self.__current_descriptor = expand(self.__current_descriptor)
self.__next_descriptor = deepcopy(self.__current_descriptor)
# Instantiate profile
self.__profile = Profile(self.__current_descriptor.get('profile'))
# Validate descriptor
try:
self.__profile.validate(self.__current_descriptor)
self.__errors = []
except exceptions.ValidationError as exception:
self.__errors = exception.errors
if self.__strict:
raise exception
# Update resource
descriptors = self.__current_descriptor.get('resources', [])
self.__resources = self.__resources[:len(descriptors)]
iterator = enumerate(six.moves.zip_longest(list(self.__resources), descriptors))
for index, (resource, descriptor) in iterator:
if (not resource or resource.descriptor != descriptor or
(resource.schema and resource.schema.foreign_keys)):
updated_resource = Resource(descriptor,
strict=self.__strict,
base_path=self.__base_path,
storage=self.__storage,
package=self)
def schema_errors(self):
try:
super(Milestone, self).validate()
except ValidationError:
for error in self.iter_errors():
yield error.message