Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_19_dump_and_single_load_with_validation(self):
if not anyconfig.schema.JSONSCHEMA_IS_AVAIL:
skip_test()
cnf = CNF_0
scm = SCM_0
cnf_path = os.path.join(self.workdir, "cnf_19.json")
scm_path = os.path.join(self.workdir, "scm_19.json")
TT.dump(cnf, cnf_path)
TT.dump(scm, scm_path)
self.assertTrue(os.path.exists(cnf_path))
self.assertTrue(os.path.exists(scm_path))
cnf_1 = TT.single_load(cnf_path, ac_schema=scm_path)
self.assertFalse(cnf_1 is None) # Validation should succeed.
self.assertTrue(dicts_equal(cnf_1, cnf), cnf_1)
cnf_2 = cnf.copy()
cnf_2["a"] = "aaa" # It's type should be integer not string.
cnf_2_path = os.path.join(self.workdir, "cnf_19_2.json")
TT.dump(cnf_2, cnf_2_path)
self.assertTrue(os.path.exists(cnf_2_path))
cnf_3 = TT.single_load(cnf_2_path, ac_schema=scm_path)
self.assertTrue(cnf_3 is None) # Validation should fail.
def test_19_dump_and_single_load_with_validation(self):
if not anyconfig.schema.JSONSCHEMA_IS_AVAIL:
skip_test()
cnf = CNF_0
scm = SCM_0
cnf_path = os.path.join(self.workdir, "cnf_19.json")
scm_path = os.path.join(self.workdir, "scm_19.json")
TT.dump(cnf, cnf_path)
TT.dump(scm, scm_path)
self.assertTrue(os.path.exists(cnf_path))
self.assertTrue(os.path.exists(scm_path))
cnf_1 = TT.single_load(cnf_path, ac_schema=scm_path)
self.assertFalse(cnf_1 is None) # Validation should succeed.
self.assertTrue(dicts_equal(cnf_1, cnf), cnf_1)
cnf_2 = cnf.copy()
cnf_2["a"] = "aaa" # It's type should be integer not string.
cnf_2_path = os.path.join(self.workdir, "cnf_19_2.json")
TT.dump(cnf_2, cnf_2_path)
self.assertTrue(os.path.exists(cnf_2_path))
cnf_3 = TT.single_load(cnf_2_path, ac_schema=scm_path)
def test_60_multi_load__w_ac_dict_option(self):
TT.dump(self.dic, self.a_path)
TT.dump(self.upd, self.b_path)
res = TT.multi_load(self.g_path, ac_dict=MyODict)
self.assert_dicts_equal(res, self.exp)
self.assertTrue(isinstance(res, MyODict))
def _check_multi_load_with_strategy(self, exp, merge=TT.MS_DICTS):
TT.dump(self.dic, self.a_path)
TT.dump(self.upd, self.b_path)
self.assertTrue(os.path.exists(self.a_path))
self.assertTrue(os.path.exists(self.b_path))
res0 = TT.multi_load(self.g_path, ac_merge=merge)
res1 = TT.multi_load([self.g_path, self.b_path], ac_merge=merge)
self.assertTrue(res0)
self.assertTrue(res1)
self.assert_dicts_equal(res0, exp)
self.assert_dicts_equal(res1, exp)
def test_32_dump_and_load__w_options(self):
TT.dump(self.dic, self.a_path, indent=2)
self.assertTrue(os.path.exists(self.a_path))
TT.dump(self.upd, self.b_path, indent=2)
self.assertTrue(os.path.exists(self.b_path))
res = TT.load(self.a_path, parse_int=int)
dic = copy.deepcopy(self.dic)
self.assert_dicts_equal(res, dic)
res = TT.load(self.g_path, parse_int=int)
exp = copy.deepcopy(self.exp)
self.assert_dicts_equal(res, exp)
res = TT.load([self.a_path, self.b_path], parse_int=int)
exp = copy.deepcopy(self.exp)
self.assert_dicts_equal(res, exp)
def test_32_dump_and_load__w_options(self):
TT.dump(self.dic, self.a_path, indent=2)
self.assertTrue(os.path.exists(self.a_path))
TT.dump(self.upd, self.b_path, indent=2)
self.assertTrue(os.path.exists(self.b_path))
res = TT.load(self.a_path, parse_int=int)
dic = copy.deepcopy(self.dic)
self.assert_dicts_equal(res, dic)
res = TT.load(self.g_path, parse_int=int)
exp = copy.deepcopy(self.exp)
self.assert_dicts_equal(res, exp)
res = TT.load([self.a_path, self.b_path], parse_int=int)
exp = copy.deepcopy(self.exp)
self.assert_dicts_equal(res, exp)
def test_39_single_load__w_validation(self):
(cnf, scm) = (CNF_0, SCM_0)
cpath = os.path.join(self.workdir, "cnf.json")
spath = os.path.join(self.workdir, "scm.json")
TT.dump(cnf, cpath)
TT.dump(scm, spath)
cnf1 = TT.single_load(cpath, ac_schema=spath)
self.assert_dicts_equal(cnf, cnf1)
def _load_and_dump_with_opened_files(self, filename, rmode='r', wmode='w',
**oopts):
cpath = os.path.join(self.workdir, filename)
with TT.open(cpath, 'w', **oopts) as out:
TT.dump(self.cnf, out)
self.assertTrue(_is_file_object(out))
self.assertEqual(out.mode, wmode)
with TT.open(cpath, 'rb', **oopts) as inp:
cnf1 = TT.single_load(inp)
self.assertTrue(_is_file_object(inp))
self.assertEqual(inp.mode, rmode)
cpair = (self.cnf, cnf1)
self.assertTrue(dicts_equal(*cpair), "%r vs. %r" % cpair)
def test_36_load_w_validation(self):
cnf_path = os.path.join(self.workdir, "cnf.json")
scm_path = os.path.join(self.workdir, "scm.json")
TT.dump(CNF_0, cnf_path)
TT.dump(SCM_0, scm_path)
cnf_2 = TT.load(cnf_path, ac_context={}, ac_schema=scm_path)
self.assertEqual(cnf_2["name"], CNF_0["name"])
self.assertEqual(cnf_2["a"], CNF_0["a"])
self.assertEqual(cnf_2["b"]["b"], CNF_0["b"]["b"])
self.assertEqual(cnf_2["b"]["c"], CNF_0["b"]["c"])
def _try_dump(cnf, outpath, otype, fmsg, extra_opts=None):
"""
:param cnf: Configuration object to print out
:param outpath: Output file path or None
:param otype: Output type or None
:param fmsg: message if it cannot detect otype by 'inpath'
:param extra_opts: Map object will be given to API.dump as extra options
"""
if extra_opts is None:
extra_opts = {}
try:
API.dump(cnf, outpath, otype, **extra_opts)
except API.UnknownFileTypeError:
_exit_with_output(fmsg % outpath, 1)
except API.UnknownProcessorTypeError:
_exit_with_output("Invalid output type '%s'" % otype, 1)