Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
[A_3]
input: 'a1.md', 'a2.md'
output: 'out.md'
report: input=['a1.md', 'a2.md'], output='out.md'
''')
wf = script.workflow()
Base_Executor(wf).run()
with open('a1.md') as a:
self.assertEqual(a.read(), 'a1\n\n')
with open('a2.md') as a:
self.assertEqual(a.read(), 'a2\n\n')
with open('out.md') as a:
self.assertEqual(a.read(), 'a1\n\na2\n\n')
for name in ('a1.md', 'a2.md', 'out.md'):
if file_target(name).exists():
file_target(name).unlink()
wf = script.workflow()
Base_Executor(wf).run()
"C1 (C1.txt)";
"B1 (B1.txt)";
"B2 (B2.txt)";
"C3 (C3.txt)";
"B3 (B3.txt)" -> "B2 (B2.txt)";
"C4 (C4.txt)" -> "C3 (C3.txt)";
"C4 (C4.txt)" -> "C2 (C2.txt)";
"C2 (C2.txt)" -> "C1 (C1.txt)";
"C1 (C1.txt)" -> "B2 (B2.txt)";
"B2 (B2.txt)" -> "B1 (B1.txt)";
"C3 (C3.txt)" -> "C1 (C1.txt)";
}
''')
Base_Executor(wf).run(targets=['B1.txt'])
for f in ['A1.txt', 'A2.txt']:
self.assertFalse(file_target(f).target_exists())
for f in [
'C2.txt', 'B2.txt', 'B1.txt', 'B3.txt', 'C1.txt', 'C3.txt',
'C4.txt'
]:
t = file_target(f)
self.assertTrue(t.target_exists())
t.unlink()
#
# test 2, we would like to generate two files
dag = Base_Executor(wf).initialize_dag(targets=['B2.txt', 'C2.txt'])
# note that A2 is no longer mentioned
self.assertDAG(
dag, '''
strict digraph "" {
"C4 (C4.txt)";
"B2 (B2.txt)";
cp local.txt result_remote.txt
echo 'adf' >> 'result_remote.txt'
''')
self.assertEqual(
subprocess.call(
'sos run test_remote.sos -c ~/docker.yml -r docker -s force -q localhost',
shell=True), 0)
self.assertFalse(file_target('result_remote.txt').target_exists())
#self.assertEqual(subprocess.call('sos preview result_remote.txt -c ~/docker.yml -r docker', shell=True), 0)
#self.assertNotEqual(subprocess.call('sos preview result_remote.txt', shell=True), 0)
self.assertEqual(
subprocess.call(
'sos remote pull docker --files result_remote.txt -c ~/docker.yml',
shell=True), 0)
self.assertTrue(file_target('result_remote.txt').target_exists())
#self.assertEqual(subprocess.call('sos preview result_remote.txt', shell=True), 0)
with open('result_remote.txt') as w:
content = w.read()
self.assertTrue('something' in content, 'Got {}'.format(content))
self.assertTrue('adf' in content, 'Got {}'.format(content))
# test sos remote run
self.assertEqual(
subprocess.call(
'sos remote run docker -c ~/docker.yml --cmd cp result_remote.txt result_remote1.txt ',
shell=True), 0)
self.assertEqual(
subprocess.call(
'sos remote pull docker --files result_remote1.txt -c ~/docker.yml',
shell=True), 0)
self.assertTrue(file_target('result_remote1.txt').target_exists())
report: output='default_10.md'
A_10
[20]
report: output='default_20.md'
A_20
[100]
# generate report
Rmarkdown(input=['default_10.md', 'default_20.md'], output='output.html')
''')
wf = script.workflow()
Base_Executor(wf, config={'report_output': '${step_name}.md'}).run()
for f in ['default_10.md', 'default_20.md', 'output.html']:
self.assertTrue(file_target(f).exists())
file_target(f).unlink()
def testZap(self):
'''Test zap'''
with open('testzap.txt', 'w') as sf:
sf.write('some text')
path('testzap.txt').zap()
self.assertTrue(os.path.isfile('testzap.txt.zapped'))
self.assertFalse(os.path.isfile('testzap.txt'))
# re-zap is ok
file_target('testzap.txt').zap()
self.assertTrue(os.path.isfile('testzap.txt.zapped'))
self.assertFalse(os.path.isfile('testzap.txt'))
# non-existent file
os.remove('testzap.txt.zapped')
self.assertRaises(FileNotFoundError, path('testzap.txt').zap)
#
with open('testzap.txt', 'w') as sf:
sf.write('some text')
with open('testzap1.txt', 'w') as sf:
sf.write('some text')
paths('testzap.txt', 'testzap1.txt').zap()
self.assertTrue(os.path.isfile('testzap.txt.zapped'))
self.assertFalse(os.path.isfile('testzap.txt'))
self.assertTrue(os.path.isfile('testzap1.txt.zapped'))
self.assertFalse(os.path.isfile('testzap1.txt'))
#
def testOutputGroupWith(self):
'''Test option group_with in output statement'''
self.touch(['a.txt', 'b.txt'])
for ofile in ['a.txt1', 'b.txt2']:
if file_target(ofile).exists():
file_target(ofile).unlink()
#
# string input
script = SoS_Script(r'''
[0]
files = ['a.txt', 'b.txt']
vars = [1, 2]
input: files, group_by=1
output: f"{_input}.bak", group_with=dict(_vars=vars[_index])
run: expand=True
touch {_output}
[1]
assert(_vars == _index + 1)
''')
def tearDown(self):
for f in self.temp_files:
file_target(f).unlink()
os.chdir(self.olddir)
def testOptionWorkdir(self):
'''Test option workdir of tasks'''
if not os.path.isdir('temp_wdr'):
os.mkdir('temp_wdr')
with open(os.path.join('temp_wdr', 'a.txt'), 'w') as tmp:
tmp.write('hello')
script = SoS_Script(r'''
[A_1]
run: workdir='temp_wdr'
cp -f a.txt a2.txt
''')
wf = script.workflow()
Base_Executor(wf).run()
self.assertTrue(
file_target(os.path.join('temp_wdr', 'a2.txt')).target_exists())
with open(os.path.join('temp_wdr', 'a.txt')) as tmp:
self.assertEqual('hello', tmp.read())
"C4 (C4.txt)" -> "C3 (C3.txt)";
"C4 (C4.txt)" -> "C2 (C2.txt)";
"C2 (C2.txt)" -> "C1 (C1.txt)";
"C1 (C1.txt)" -> "B2 (B2.txt)";
"B2 (B2.txt)" -> "B1 (B1.txt)";
"C3 (C3.txt)" -> "C1 (C1.txt)";
}
''')
Base_Executor(wf).run(targets=['B1.txt'])
for f in ['A1.txt', 'A2.txt']:
self.assertFalse(file_target(f).target_exists())
for f in [
'C2.txt', 'B2.txt', 'B1.txt', 'B3.txt', 'C1.txt', 'C3.txt',
'C4.txt'
]:
t = file_target(f)
self.assertTrue(t.target_exists())
t.unlink()
#
# test 2, we would like to generate two files
dag = Base_Executor(wf).initialize_dag(targets=['B2.txt', 'C2.txt'])
# note that A2 is no longer mentioned
self.assertDAG(
dag, '''
strict digraph "" {
"C4 (C4.txt)";
"B2 (B2.txt)";
"C3 (C3.txt)";
"B3 (B3.txt)";
"C2 (C2.txt)";
"C1 (C1.txt)";
"C4 (C4.txt)" -> "C2 (C2.txt)";
def tearDown(self):
for f in self.temp_files:
file_target(f).unlink()
os.chdir(self.olddir)