Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import copy
import functools
import logging
import os
import tempfile
from future.utils import raise_from
import pykwalify
from pykwalify import core
import yaml
from tavern.plugins import load_plugins
from tavern.util.exceptions import BadSchemaError
from tavern.util.loader import IncludeLoader, load_single_document_yaml
core.yaml.safe_load = functools.partial(yaml.load, Loader=IncludeLoader)
logger = logging.getLogger(__name__)
class SchemaCache(object):
"""Caches loaded schemas"""
def __init__(self):
self._loaded = {}
def _load_base_schema(self, schema_filename):
try:
return self._loaded[schema_filename]
except KeyError:
self._loaded[schema_filename] = load_single_document_yaml(schema_filename)
def process_module(module):
module_path = PurePath(module)
module_yml = module_path.joinpath('zephyr/module.yml')
# The input is a module if zephyr/module.yml is a valid yaml file
# or if both zephyr/CMakeLists.txt and zephyr/Kconfig are present.
if Path(module_yml).is_file():
with Path(module_yml).open('r') as f:
meta = yaml.safe_load(f.read())
try:
pykwalify.core.Core(source_data=meta, schema_data=schema)\
.validate()
except pykwalify.errors.SchemaError as e:
sys.exit('ERROR: Malformed "build" section in file: {}\n{}'
.format(module_yml.as_posix(), e))
return meta
if Path(module_path.joinpath('zephyr/CMakeLists.txt')).is_file() and \
Path(module_path.joinpath('zephyr/Kconfig')).is_file():
return {'build': {'cmake': 'zephyr', 'kconfig': 'zephyr/Kconfig'}}
return None
# Clone the west source code and the manifest into west/. Git will create
# the west/ directory if it does not exist.
clone('manifest repository', manifest_url, manifest_rev,
os.path.join(directory, WEST_DIR, MANIFEST))
# Parse the manifest and look for a section named "west"
manifest_file = os.path.join(directory, WEST_DIR, MANIFEST, 'default.yml')
with open(manifest_file, 'r') as f:
data = yaml.safe_load(f.read())
if 'west' in data:
wdata = data['west']
try:
pykwalify.core.Core(
source_data=wdata,
schema_files=[_SCHEMA_PATH]
).validate()
except pykwalify.errors.SchemaError as e:
sys.exit("Error: Failed to parse manifest file '{}': {}"
.format(manifest_file, e))
if 'url' in wdata:
west_url = wdata['url']
if 'revision' in wdata:
west_rev = wdata['revision']
print("cloning {} at revision {}".format(west_url, west_rev))
clone('west repository', west_url, west_rev,
os.path.join(directory, WEST_DIR, WEST))
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import sys
from pykwalify import core as pykwalify_core
from pykwalify import errors as pykwalify_errors
stream = open(sys.argv[1], 'r')
schema = yaml.safe_load(stream)
check = pykwalify_core.Core(sys.argv[2], schema_data=schema)
try:
check.validate(raise_exception=True)
print ("Validation successful")
exit(0)
except pykwalify_errors.SchemaError as e:
print ("Config " + sys.argv[2] + " is not valid!")
raise Exception('File does not conform to schema: {}'.format(e))
f'escapes project path {project.path}')
# The project may not be cloned yet, or this might be coming
# from a manifest that was copy/pasted into a self import
# location.
if not os.path.exists(spec_file):
continue
# Load the spec file and check the schema.
with open(spec_file, 'r') as f:
try:
commands_spec = yaml.safe_load(f.read())
except yaml.YAMLError as e:
raise ExtensionCommandError from e
try:
pykwalify.core.Core(
source_data=commands_spec,
schema_files=[_EXT_SCHEMA_PATH]).validate()
except pykwalify.errors.SchemaError as e:
raise ExtensionCommandError from e
for commands_desc in commands_spec['west-commands']:
ret.extend(_ext_specs_from_desc(project, commands_desc))
return ret
def _yaml_validate(data, schema):
if not schema:
return
c = pykwalify.core.Core(source_data=data, schema_data=schema)
c.validate(raise_exception=True)
def validate(data, schema):
if not schema:
return
c = pykwalify.core.Core(source_data = data, schema_data = schema)
c.validate(raise_exception = True)
def _yaml_validate(data, schema):
if not schema:
return
c = pykwalify.core.Core(source_data = data, schema_data = schema)
c.validate(raise_exception = True)
# version: "1.0"
#
# by explicitly allowing:
#
# version: 1.0
min_version_str = str(data['version'])
min_version = parse_version(min_version_str)
if min_version > _SCHEMA_VER:
raise ManifestVersionError(min_version_str)
elif min_version < _EARLIEST_VER:
raise MalformedManifest(
f'invalid version {min_version_str}; '
f'lowest schema version is {_EARLIEST_VER_STR}')
try:
pykwalify.core.Core(source_data=data,
schema_files=[_SCHEMA_PATH]).validate()
except pykwalify.errors.SchemaError as se:
raise MalformedManifest(se.msg) from se
raise ExtensionCommandError(
'west-commands file {} escapes project path {}'.
format(project.west_commands, project.path))
# Project may not be cloned yet.
if not os.path.exists(spec_file):
return []
# Load the spec file and check the schema.
with open(spec_file, 'r') as f:
try:
commands_spec = yaml.safe_load(f.read())
except yaml.YAMLError as e:
raise ExtensionCommandError from e
try:
pykwalify.core.Core(
source_data=commands_spec,
schema_files=[_EXT_SCHEMA_PATH]).validate()
except pykwalify.errors.SchemaError as e:
raise ExtensionCommandError from e
ret = []
for commands_desc in commands_spec['west-commands']:
ret.extend(_ext_specs_from_desc(project, commands_desc))
return ret