Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# internal python types in the yaml loading.
yml.constructor.add_constructor('tag:yaml.org,2002:python/bool', Constructor.construct_yaml_bool)
yml.constructor.add_constructor('tag:yaml.org,2002:python/complex', Constructor.construct_python_complex)
yml.constructor.add_constructor('tag:yaml.org,2002:python/dict', Constructor.construct_yaml_map)
yml.constructor.add_constructor('tag:yaml.org,2002:python/float', Constructor.construct_yaml_float)
yml.constructor.add_constructor('tag:yaml.org,2002:python/int', Constructor.construct_yaml_int)
yml.constructor.add_constructor('tag:yaml.org,2002:python/list', Constructor.construct_yaml_seq)
yml.constructor.add_constructor('tag:yaml.org,2002:python/long', Constructor.construct_python_long)
yml.constructor.add_constructor('tag:yaml.org,2002:python/none', Constructor.construct_yaml_null)
yml.constructor.add_constructor('tag:yaml.org,2002:python/str', Constructor.construct_python_str)
yml.constructor.add_constructor('tag:yaml.org,2002:python/tuple', Constructor.construct_python_tuple)
yml.constructor.add_constructor('tag:yaml.org,2002:python/unicode', Constructor.construct_python_unicode)
if data_file_obj:
try:
self.source = yml.load(data_file_obj.read())
except Exception as e:
raise CoreError("Unable to load data_file_obj input")
if schema_file_obj:
try:
self.schema = yml.load(schema_file_obj.read())
except Exception as e:
raise CoreError("Unable to load schema_file_obj")
if source_file is not None:
if not os.path.exists(source_file):
raise CoreError(u"Provided source_file do not exists on disk: {0}".format(source_file))
with open(source_file, "r", encoding=file_encoding) as stream:
if source_file.endswith(".json"):
self.source = json.load(stream)
if not isinstance(schema_files, list):
raise CoreError(u"schema_files must be of list type")
# Merge all schema files into one single file for easy parsing
if len(schema_files) > 0:
schema_data = {}
for f in schema_files:
if not os.path.exists(f):
raise CoreError(u"Provided source_file do not exists on disk : {0}".format(f))
with open(f, "r", encoding=file_encoding) as stream:
if f.endswith(".json"):
data = json.load(stream)
elif f.endswith(".yaml") or f.endswith(".yml"):
data = yml.load(stream)
if not data:
raise CoreError(u"No data loaded from file : {0}".format(f))
else:
raise CoreError(u"Unable to load file : {0} : Unknown file format. Supported file endings is [.json, .yaml, .yml]")
for key in data.keys():
if key in schema_data.keys():
raise CoreError(u"Parsed key : {0} : two times in schema files...".format(key))
schema_data = dict(schema_data, **data)
self.schema = schema_data
# Nothing was loaded so try the source_data variable
if self.source is None:
log.debug(u"No source file loaded, trying source data variable")
yml.constructor.add_constructor('tag:yaml.org,2002:python/list', Constructor.construct_yaml_seq)
yml.constructor.add_constructor('tag:yaml.org,2002:python/long', Constructor.construct_python_long)
yml.constructor.add_constructor('tag:yaml.org,2002:python/none', Constructor.construct_yaml_null)
yml.constructor.add_constructor('tag:yaml.org,2002:python/str', Constructor.construct_python_str)
yml.constructor.add_constructor('tag:yaml.org,2002:python/tuple', Constructor.construct_python_tuple)
yml.constructor.add_constructor('tag:yaml.org,2002:python/unicode', Constructor.construct_python_unicode)
if data_file_obj:
try:
self.source = yml.load(data_file_obj.read())
except Exception as e:
raise CoreError("Unable to load data_file_obj input")
if schema_file_obj:
try:
self.schema = yml.load(schema_file_obj.read())
except Exception as e:
raise CoreError("Unable to load schema_file_obj")
if source_file is not None:
if not os.path.exists(source_file):
raise CoreError(u"Provided source_file do not exists on disk: {0}".format(source_file))
with open(source_file, "r", encoding=file_encoding) as stream:
if source_file.endswith(".json"):
self.source = json.load(stream)
elif source_file.endswith(".yaml") or source_file.endswith('.yml'):
self.source = yml.load(stream)
else:
raise CoreError(u"Unable to load source_file. Unknown file format of specified file path: {0}".format(source_file))
if not isinstance(schema_files, list):
if schema_file_obj:
try:
self.schema = yml.load(schema_file_obj.read())
except Exception as e:
raise CoreError("Unable to load schema_file_obj")
if source_file is not None:
if not os.path.exists(source_file):
raise CoreError(u"Provided source_file do not exists on disk: {0}".format(source_file))
with open(source_file, "r", encoding=file_encoding) as stream:
if source_file.endswith(".json"):
self.source = json.load(stream)
elif source_file.endswith(".yaml") or source_file.endswith('.yml'):
self.source = yml.load(stream)
else:
raise CoreError(u"Unable to load source_file. Unknown file format of specified file path: {0}".format(source_file))
if not isinstance(schema_files, list):
raise CoreError(u"schema_files must be of list type")
# Merge all schema files into one single file for easy parsing
if len(schema_files) > 0:
schema_data = {}
for f in schema_files:
if not os.path.exists(f):
raise CoreError(u"Provided source_file do not exists on disk : {0}".format(f))
with open(f, "r", encoding=file_encoding) as stream:
if f.endswith(".json"):
data = json.load(stream)