Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_curl_plugin_fetch_evil_archive(self):
# There are several evil archives checked in under tests/resources. The
# others are checked directly as part of test_curl_plugin.py.
fields = {
'url': (shared.test_resources / '.tar').as_uri(),
'unpack': 'tar',
}
with self.assertRaises(plugin.PluginRuntimeError):
self.do_plugin_test('curl', fields, {})
]
# The parens make this a capturing expression, so the tokens will be
# included in re.split()'s return list.
token_expr = '(' + '|'.join(re.escape(token) for token in tokens) + ')'
pieces = re.split(token_expr, string)
for piece in pieces:
if piece in (display.ANSI_DISABLE_LINE_WRAP,
display.ANSI_ENABLE_LINE_WRAP):
# Ignore the line wrap codes. TODO: Test for these?
continue
elif piece == display.ANSI_CLEAR_LINE:
buffer = self.lines[self.cursor_line]
buffer.seek(0)
buffer.truncate()
elif piece == display.ANSI_CURSOR_UP_ONE_LINE:
col = self.lines[self.cursor_line].tell()
self.cursor_line -= 1
assert self.cursor_line >= 0
new_buffer = self.lines[self.cursor_line]
new_buffer.seek(col)
elif piece == '\n':
self.cursor_line += 1
if self.cursor_line == len(self.lines):
self.lines.append(io.StringIO())
self.lines[self.cursor_line].seek(0)
else:
self.lines[self.cursor_line].write(piece)
def write(self, string):
tokens = [
display.ANSI_DISABLE_LINE_WRAP, display.ANSI_ENABLE_LINE_WRAP,
display.ANSI_CLEAR_LINE, display.ANSI_CURSOR_UP_ONE_LINE, '\n'
]
# The parens make this a capturing expression, so the tokens will be
# included in re.split()'s return list.
token_expr = '(' + '|'.join(re.escape(token) for token in tokens) + ')'
pieces = re.split(token_expr, string)
for piece in pieces:
if piece in (display.ANSI_DISABLE_LINE_WRAP,
display.ANSI_ENABLE_LINE_WRAP):
# Ignore the line wrap codes. TODO: Test for these?
continue
elif piece == display.ANSI_CLEAR_LINE:
buffer = self.lines[self.cursor_line]
buffer.seek(0)
buffer.truncate()
elif piece == display.ANSI_CURSOR_UP_ONE_LINE:
col = self.lines[self.cursor_line].tell()
self.cursor_line -= 1
assert self.cursor_line >= 0
new_buffer = self.lines[self.cursor_line]
new_buffer.seek(col)
elif piece == '\n':
self.cursor_line += 1
if self.cursor_line == len(self.lines):
self.lines.append(io.StringIO())
self.lines[self.cursor_line].seek(0)
else:
self.lines[self.cursor_line].write(piece)
def write(self, string):
tokens = [
display.ANSI_DISABLE_LINE_WRAP, display.ANSI_ENABLE_LINE_WRAP,
display.ANSI_CLEAR_LINE, display.ANSI_CURSOR_UP_ONE_LINE, '\n'
]
# The parens make this a capturing expression, so the tokens will be
# included in re.split()'s return list.
token_expr = '(' + '|'.join(re.escape(token) for token in tokens) + ')'
pieces = re.split(token_expr, string)
for piece in pieces:
if piece in (display.ANSI_DISABLE_LINE_WRAP,
display.ANSI_ENABLE_LINE_WRAP):
# Ignore the line wrap codes. TODO: Test for these?
continue
elif piece == display.ANSI_CLEAR_LINE:
buffer = self.lines[self.cursor_line]
buffer.seek(0)
buffer.truncate()
def write(self, string):
tokens = [
display.ANSI_DISABLE_LINE_WRAP, display.ANSI_ENABLE_LINE_WRAP,
display.ANSI_CLEAR_LINE, display.ANSI_CURSOR_UP_ONE_LINE, '\n'
]
# The parens make this a capturing expression, so the tokens will be
# included in re.split()'s return list.
token_expr = '(' + '|'.join(re.escape(token) for token in tokens) + ')'
pieces = re.split(token_expr, string)
for piece in pieces:
if piece in (display.ANSI_DISABLE_LINE_WRAP,
display.ANSI_ENABLE_LINE_WRAP):
# Ignore the line wrap codes. TODO: Test for these?
continue
elif piece == display.ANSI_CLEAR_LINE:
buffer = self.lines[self.cursor_line]
buffer.seek(0)
buffer.truncate()
elif piece == display.ANSI_CURSOR_UP_ONE_LINE:
col = self.lines[self.cursor_line].tell()
self.cursor_line -= 1
assert self.cursor_line >= 0
new_buffer = self.lines[self.cursor_line]
new_buffer.seek(col)
elif piece == '\n':
self.cursor_line += 1
if self.cursor_line == len(self.lines):
'''A no-op sync should be a single git command. Also check that index
files are deleted after any sync error.'''
module_dir = shared.create_dir({'foo': 'bar'})
self.write_yaml(
'''\
cp module foo:
path: {}
imports:
foo: subdir
''', module_dir)
index_path = os.path.join(self.test_dir, '.peru/lastimports.index')
# The first sync should take multiple operations and create a
# lastimports.index file.
peru.cache.DEBUG_GIT_COMMAND_COUNT = 0
self.do_integration_test(['sync'], {'subdir/foo': 'bar'})
assert peru.cache.DEBUG_GIT_COMMAND_COUNT > 1, \
'The first sync should take multiple operations.'
assert os.path.exists(index_path), \
'The first sync should create an index file.'
# The second sync should reuse the index file and only take one
# operation.
peru.cache.DEBUG_GIT_COMMAND_COUNT = 0
self.do_integration_test(['sync'], {'subdir/foo': 'bar'})
assert peru.cache.DEBUG_GIT_COMMAND_COUNT == 1, \
'The second sync should take only one operation.'
assert os.path.exists(index_path), \
'The second sync should preserve the index file.'
# Now force an error. This should delete the index file.
assert os.path.exists(index_path), \
'The first sync should create an index file.'
# The second sync should reuse the index file and only take one
# operation.
peru.cache.DEBUG_GIT_COMMAND_COUNT = 0
self.do_integration_test(['sync'], {'subdir/foo': 'bar'})
assert peru.cache.DEBUG_GIT_COMMAND_COUNT == 1, \
'The second sync should take only one operation.'
assert os.path.exists(index_path), \
'The second sync should preserve the index file.'
# Now force an error. This should delete the index file.
with open(os.path.join(self.test_dir, 'subdir/foo'), 'w') as f:
f.write('dirty')
with self.assertRaises(peru.cache.DirtyWorkingCopyError):
run_peru_command(['sync'], self.test_dir)
assert not os.path.exists(index_path), \
'The error should delete the index file.'
# Fix the error and resync with new module contents. This should
# recreate the index file with the current tree and then succeed,
# rather than using an empty index and treating the current files as
# conflicting.
with open(os.path.join(self.test_dir, 'subdir/foo'), 'w') as f:
f.write('bar')
with open(os.path.join(module_dir, 'foo'), 'w') as f:
f.write('new bar')
self.do_integration_test(['sync', '--no-cache'],
{'subdir/foo': 'new bar'})
assert os.path.exists(index_path), \
'The index should have been recreated.'
def test_sync_from_subdir(self):
module_dir = shared.create_dir({'foo': 'bar'})
self.write_yaml(
'''\
# Use a relative module path, to make sure it gets resolved
# relative to the project root and not the dir where peru was
# called.
cp module relative_foo:
path: {}
imports:
relative_foo: subdir
''', os.path.relpath(module_dir, start=self.test_dir))
subdir = os.path.join(self.test_dir, 'a', 'b')
peru.compat.makedirs(subdir)
run_peru_command(['sync'], subdir)
self.assertTrue(
os.path.isdir(os.path.join(self.test_dir, '.peru')),
msg=".peru dir didn't end up in the right place")
assert_contents(os.path.join(self.test_dir, 'subdir'), {'foo': 'bar'})
def _tmp_root():
root = os.path.join(tempfile.gettempdir(), 'peru', 'test')
makedirs(root)
return root
def test_build_field_deprecated_message(self):
input = dedent('''\
rule foo:
build: shell command
''')
try:
parse_string(input)
except ParserError as e:
assert 'The "build" field is no longer supported.' in e.message
else:
assert False, 'expected ParserError'