Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sos_targets('a2.txt', 'a4.txt')])
# group_by = 'pairs2'
script = SoS_Script('''
[0: shared='executed']
executed = []
input: [f'a{x}.txt' for x in range(1, 9)], group_by='pairs2'
executed.append(_input)
''')
wf = script.workflow()
Base_Executor(wf).run(mode='dryrun')
self.assertEqual(env.sos_dict['executed'], [
sos_targets('a1.txt', 'a2.txt', 'a5.txt', 'a6.txt'),
sos_targets('a3.txt', 'a4.txt', 'a7.txt', 'a8.txt')
])
# group_by = 'pairs3'
script = SoS_Script('''
[0: shared='executed']
executed = []
input: [f'a{x}.txt' for x in range(1, 13)], group_by='pairs3'
executed.append(_input)
''')
wf = script.workflow()
Base_Executor(wf).run(mode='dryrun')
self.assertEqual(env.sos_dict['executed'], [
sos_targets('a1.txt', 'a2.txt', 'a3.txt', 'a7.txt', 'a8.txt',
'a9.txt'),
])
# group_by = 'pairwiseN'
script = SoS_Script('''
[0: shared='executed']
executed = []
input: ['a{}.txt'.format(x) for x in range(1, 7)], group_by='pairwise2'
executed.append(_input)
''')
wf = script.workflow()
Base_Executor(wf).run(mode='dryrun')
self.assertEqual(env.sos_dict['executed'], [
sos_targets('a1.txt', 'a2.txt', 'a3.txt', 'a4.txt'),
sos_targets('a3.txt', 'a4.txt', 'a5.txt', 'a6.txt')
], f'obtained {env.sos_dict["executed"]}')
# group_by = 'combinations'
script = SoS_Script('''
[0: shared='executed']
executed = []
input: ['a{}.txt'.format(x) for x in range(1, 5)], group_by='combinations'
executed.append(_input)
''')
wf = script.workflow()
Base_Executor(wf).run(mode='dryrun')
self.assertEqual(env.sos_dict['executed'], [
sos_targets('a1.txt', 'a2.txt'),
script = SoS_Script('''
parameter: path_var = paths('a.txt')
[0]
''')
wf = script.workflow()
Base_Executor(wf, args=['--path-var', 'a.txt', 'b.txt']).run(mode='run')
self.assertTrue(isinstance(env.sos_dict['path_var'], paths))
#
#
script = SoS_Script('''
parameter: path_var = sos_targets('a.txt')
[0]
''')
wf = script.workflow()
Base_Executor(wf, args=['--path-var', 'a.txt']).run(mode='run')
self.assertTrue(isinstance(env.sos_dict['path_var'], sos_targets))
#
# Test allow the use of sos keywords as parameters #1041
script = SoS_Script('''\
[1]
parameter: input = 5
output = 10
python: expand=True
print({input})
''')
wf = script.workflow()
Base_Executor(wf).run()
# multiple parameters
script = SoS_Script('''
parameter: a_b = int
[0]
def testSoSTargetsSignature(self):
'''Test save and validate signatures of sos_targets'''
with open('a.txt', 'w') as a:
a.write('text1')
with open('b.txt', 'w') as b:
b.write('text2')
t = sos_targets('a.txt', 'b.txt')
sig = t.target_signature()
self.assertTrue(t.validate(sig))
# variables does not affect signature
t[0].set('a', 2)
self.assertEqual(sig, t.target_signature())
#
t.set('cc', 'another string')
self.assertTrue(t.validate(sig))
# group_by = 'pairs3'
script = SoS_Script('''
[0: shared='executed']
executed = []
input: [f'a{x}.txt' for x in range(1, 13)], group_by='pairs3'
executed.append(_input)
''')
wf = script.workflow()
Base_Executor(wf).run(mode='dryrun')
self.assertEqual(env.sos_dict['executed'], [
sos_targets('a1.txt', 'a2.txt', 'a3.txt', 'a7.txt', 'a8.txt',
'a9.txt'),
sos_targets('a4.txt', 'a5.txt', 'a6.txt', 'a10.txt', 'a11.txt',
'a12.txt')
])
# group_by = 'pairwise'
script = SoS_Script('''
[0: shared='executed']
executed = []
input: ['a{}.txt'.format(x) for x in range(1, 5)], group_by='pairwise'
executed.append(_input)
''')
wf = script.workflow()
Base_Executor(wf).run(mode='dryrun')
self.assertEqual(env.sos_dict['executed'], [
task:
python: expand=True
print(f'{output}')
[A_5]
task:
print(f'{_output}')
''')
wf = script.workflow('A')
Base_Executor(wf)
for section in wf.sections:
res = analyze_section(section)
if section.names[0][1] == '1':
self.assertTrue(res['step_input'].undetermined())
self.assertEqual(res['step_depends'], sos_targets())
self.assertEqual(res['step_output'], sos_targets())
self.assertEqual(res['environ_vars'], {'b', 'p1', 'infiles'})
self.assertEqual(res['signature_vars'], {'c'})
self.assertEqual(res['changed_vars'], {'b'})
elif section.names[0][1] == '2':
self.assertEqual(res['step_input'], sos_targets())
self.assertEqual(res['step_depends'],
sos_targets('some.txt', executable('ls')))
self.assertTrue(res['step_output'].unspecified())
# for_each will not be used for DAG
self.assertEqual(res['environ_vars'],
{'b', 'for_each', 'executable'})
self.assertEqual(res['signature_vars'], {'r', 'time', 'random'})
self.assertEqual(res['changed_vars'], set())
elif section.names[0][1] == '4':
self.assertTrue('output' in res['signature_vars'])
elif section.names[0][1] == '5':
env.config['sig_mode'] = 'default'
# generate files (default step 0 and 1)
Base_Executor(wf, config={'default_queue': 'localhost'}).run()
# now, rerun in build mode
env.config['sig_mode'] = 'build'
res = Base_Executor(wf, config={'default_queue': 'localhost'}).run()
self.assertEqual(res['__completed__']['__step_completed__'], 0)
#
self.assertTrue(os.path.isfile('temp/c.txt'))
self.assertTrue(os.path.isfile('temp/d.txt'))
with open('temp/c.txt') as tc:
self.assertTrue(tc.read(), 'a.txt')
with open('temp/d.txt') as td:
self.assertTrue(td.read(), 'b.txt')
self.assertEqual(env.sos_dict['oa'],
sos_targets('temp/c.txt', 'temp/d.txt'))
#
# now in assert mode, the signature should be there
env.config['sig_mode'] = 'assert'
res = Base_Executor(wf, config={'default_queue': 'localhost'}).run()
self.assertEqual(res['__completed__']['__step_completed__'], 0)
#
env.config['sig_mode'] = 'default'
res = Base_Executor(wf, config={'default_queue': 'localhost'}).run()
self.assertEqual(res['__completed__']['__step_completed__'], 0)
#
# change script a little bit
script = SoS_Script('# comment\n' + text)
wf = script.workflow()
env.config['sig_mode'] = 'assert'
#pid = p.pid
out, err = p.communicate()
sys.stdout.write(out.decode())
sys.stderr.write(err.decode())
ret = p.returncode
else:
p = subprocess.Popen(cmd, shell=True)
#pid = p.pid
ret = p.wait()
except Exception as e:
env.logger.error(e)
if ret != 0:
temp_file = os.path.join('.sos', f'{"Rmarkdown"}_{os.getpid()}.md')
shutil.copyfile(str(input), temp_file)
cmd = interpolate(f'Rscript -e "rmarkdown::render({args})"',
{'input': input, 'output': sos_targets(temp_file)})
raise RuntimeError(
f'Failed to execute script. Please use command \n"{cmd}"\nunder {os.getcwd()} to test it.')
if write_to_stdout:
with open(str(output[0])) as out:
sys.stdout.write(out.read())
else:
env.logger.info(f'Report saved to {output}')
def init_input_output_vars(self):
# we keep these variables (which can be result of stepping through previous statements)
# if no input and/or output statement is defined
for key in ('step_input', '_depends', 'step_output', 'step_depends',
'_depends'):
if key not in env.sos_dict:
env.sos_dict.set(key, sos_targets([]))
if '_output' not in env.sos_dict:
env.sos_dict.set('_output', sos_targets(_undetermined=True))
if any(x[0] == ':' and x[1] == 'input' for x in self.step.statements):
env.sos_dict.set('step_input', sos_targets([]))
env.sos_dict.set('_input', sos_targets([]))
if any(x[0] == ':' and x[1] == 'output' for x in self.step.statements):
env.sos_dict.set('step_output', sos_targets([]))
env.sos_dict.set('_output', sos_targets([]))
env.sos_dict.pop('__default_output__', None)