Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_sub_class_exceptions(self):
u_e = errors.UnknownError()
assert u_e.retcode == 1
s_e = errors.SchemaError()
assert s_e.retcode == 2
c_e = errors.CoreError()
assert c_e.retcode == 3
r_e = errors.RuleError()
assert r_e.retcode == 4
sc_e = errors.SchemaConflict()
assert sc_e.retcode == 5
west_url = WEST_URL_DEFAULT
west_rev = WEST_REV_DEFAULT
west_dir = os.path.join(directory, WEST_DIR, WEST)
# Parse the manifest and look for a section named "west"
with open(manifest_file, 'r') as f:
data = yaml.safe_load(f.read())
if 'west' in data:
wdata = data['west']
try:
pykwalify.core.Core(
source_data=wdata,
schema_files=[_SCHEMA_PATH]
).validate()
except pykwalify.errors.SchemaError as e:
sys.exit("Error: Failed to parse manifest file '{}': {}"
.format(manifest_file, e))
if 'url' in wdata:
west_url = wdata['url']
if 'revision' in wdata:
west_rev = wdata['revision']
print("cloning {} at revision {}".format(west_url, west_rev))
clone('west repository', west_url, west_rev, west_dir)
# Make sure west has a manifest-rev branch, like any project.
subprocess.check_call(['git', 'update-ref', 'refs/heads/manifest-rev',
'HEAD'],
cwd=west_dir)
clone('manifest repository', manifest_url, manifest_rev,
os.path.join(directory, WEST_DIR, MANIFEST))
# Parse the manifest and look for a section named "west"
manifest_file = os.path.join(directory, WEST_DIR, MANIFEST, 'default.yml')
with open(manifest_file, 'r') as f:
data = yaml.safe_load(f.read())
if 'west' in data:
wdata = data['west']
try:
pykwalify.core.Core(
source_data=wdata,
schema_files=[_SCHEMA_PATH]
).validate()
except pykwalify.errors.SchemaError as e:
sys.exit("Error: Failed to parse manifest file '{}': {}"
.format(manifest_file, e))
if 'url' in wdata:
west_url = wdata['url']
if 'revision' in wdata:
west_rev = wdata['revision']
print("cloning {} at revision {}".format(west_url, west_rev))
clone('west repository', west_url, west_rev,
os.path.join(directory, WEST_DIR, WEST))
# Create an initial configuration file
config_path = os.path.join(directory, WEST_DIR, 'config')
update_conf(config_path, manifest_url, manifest_rev)
# from a manifest that was copy/pasted into a self import
# location.
if not os.path.exists(spec_file):
continue
# Load the spec file and check the schema.
with open(spec_file, 'r') as f:
try:
commands_spec = yaml.safe_load(f.read())
except yaml.YAMLError as e:
raise ExtensionCommandError from e
try:
pykwalify.core.Core(
source_data=commands_spec,
schema_files=[_EXT_SCHEMA_PATH]).validate()
except pykwalify.errors.SchemaError as e:
raise ExtensionCommandError from e
for commands_desc in commands_spec['west-commands']:
ret.extend(_ext_specs_from_desc(project, commands_desc))
return ret
def validate(yml_rule: Dict, schema_path: str):
"""
This uses pykwalify to validate the given Rule YAML file against the Rule
schema.
"""
try:
c = Core(source_data=yml_rule, schema_files=[schema_path], fix_ruby_style_regex=True)
c.validate(raise_exception=True)
except pykwalify.errors.SchemaError as e:
raise click.ClickException(e.msg)
def validate_config(config_file):
''' Validates config against the schema
And also validates the lambdas in the subscriptions that they are defined
Raises ConfigValidationError if not valid.
'''
schema_path = resource_filename("xflow", "schema.yaml")
c = Core(source_file=config_file, schema_files=[schema_path])
try:
config = c.validate(raise_exception=True)
except pykwalify.errors.SchemaError as ex:
raise ConfigValidationError(str(ex))
subscriptions = config.get('subscriptions') or []
lambdas = config.get('lambdas') or []
lambda_names = [l['name'] for l in lambdas]
subscription_events = []
for ss in subscriptions:
event_name = ss['event']
subscription_events.append(event_name)
subscribers = ss.get('subscribers') or []
for s in subscribers:
if s not in lambda_names:
raise ConfigValidationError("Lambda not defined for subscriber %s" % s)
workflows = config.get('workflows') or []
for w in workflows:
# by explicitly allowing:
#
# version: 1.0
min_version_str = str(data['version'])
min_version = parse_version(min_version_str)
if min_version > _SCHEMA_VER:
raise ManifestVersionError(min_version_str)
elif min_version < _EARLIEST_VER:
raise MalformedManifest(
f'invalid version {min_version_str}; '
f'lowest schema version is {_EARLIEST_VER_STR}')
try:
pykwalify.core.Core(source_data=data,
schema_files=[_SCHEMA_PATH]).validate()
except pykwalify.errors.SchemaError as se:
raise MalformedManifest(se.msg) from se