Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_chmod_read_write_recursively_on_dir(self):
test_dir = self.get_test_loc('fileutils/executable', copy=True)
test_file = join(test_dir, 'deep1', 'deep2', 'ctags')
test_dir2 = join(test_dir, 'deep1', 'deep2')
parent = join(test_dir, 'deep1')
try:
make_non_writable(test_file)
assert not filetype.is_writable(test_file)
if on_posix:
make_non_executable(test_file)
assert not filetype.is_executable(test_file)
if on_posix:
make_non_executable(test_dir2)
assert not filetype.is_executable(test_dir2)
make_non_writable(test_dir)
if on_posix:
assert not filetype.is_writable(test_dir2)
fileutils.chmod(parent, fileutils.RW, recurse=True)
assert filetype.is_readable(test_dir2) is True
assert filetype.is_writable(test_dir2)
if on_posix:
assert filetype.is_executable(test_dir2)
finally:
fileutils.chmod(test_dir, fileutils.RW, recurse=True)
test_dir_path = fileutils.as_posixpath(test_dir)
for top, _, files in os.walk(test_dir):
for f in files:
location = os.path.join(top, f)
locs.append(location)
path = fileutils.as_posixpath(location)
path = path.replace(test_dir_path, '').strip('/')
result.append(path)
assert sorted(expected) == sorted(result)
for location in locs:
assert filetype.is_file(location)
assert not filetype.is_special(location)
assert filetype.is_readable(location)
def check_files(test_dir, expected):
"""
Walk test_dir.
Check that all dirs are readable.
Check that all files are:
* non-special,
* readable,
* have a posix path that ends with one of the expected tuple paths.
"""
result = []
locs = []
if filetype.is_file(test_dir):
test_dir = fileutils.parent_directory(test_dir)
test_dir_path = fileutils.as_posixpath(test_dir)
for top, _, files in os.walk(test_dir):
for f in files:
location = os.path.join(top, f)
locs.append(location)
path = fileutils.as_posixpath(location)
path = path.replace(test_dir_path, '').strip('/')
result.append(path)
assert sorted(expected) == sorted(result)
for location in locs:
assert filetype.is_file(location)
assert not filetype.is_special(location)
def is_about_file(location):
return (filetype.is_file(location)
and location.lower().endswith(('.about',)))
errors.extend(copytime(src, dst))
for name in names:
srcname = os.path.join(src, name)
dstname = os.path.join(dst, name)
# skip anything that is not a regular file, dir or link
if not filetype.is_regular(srcname):
continue
if not filetype.is_readable(srcname):
chmod(srcname, R, recurse=False)
try:
if os.path.isdir(srcname):
copytree(srcname, dstname)
elif filetype.is_file(srcname):
copyfile(srcname, dstname)
# catch the Error from the recursive copytree so that we can
# continue with other files
except shutil.Error as err:
errors.extend(err.args[0])
except EnvironmentError as why:
errors.append((srcname, dstname, str(why)))
if errors:
raise shutil.Error(errors)
def is_conda_yaml(location):
return (filetype.is_file(location) and fileutils.file_name(location).lower().endswith(('.yaml', '.yml')))
def parse_with_dparse(location):
is_dir = filetype.is_dir(location)
if is_dir:
return
file_name = fileutils.file_name(location)
if file_name not in (filetypes.requirements_txt,
filetypes.conda_yml,
filetypes.tox_ini,
filetypes.pipfile,
filetypes.pipfile_lock):
return
if py2:
mode = 'rb'
else:
mode = 'r'
with open(location, mode) as f:
content = f.read()
df = dparse.parse(content, file_type=file_name)
for e in xev.errors:
echo_stderr('ERROR extracting: %(source)s: %(e)s' % locals(), fg='red')
for warn in xev.warnings:
echo_stderr('WARNING extracting: %(source)s: %(warn)s' % locals(), fg='yellow')
summary_color = 'green'
if has_warnings:
summary_color = 'yellow'
if has_errors:
summary_color = 'red'
echo_stderr('Extracting done.', fg=summary_color, reset=True)
# use for relative paths computation
len_base_path = len(abs_location)
base_is_dir = filetype.is_dir(abs_location)
extract_results = []
has_extract_errors = False
extractibles = extract_archives(abs_location, recurse=not shallow)
if not quiet:
echo_stderr('Extracting archives...', fg='green')
with utils.progressmanager(extractibles,
item_show_func=extract_event, verbose=verbose) as extraction_events:
for xev in extraction_events:
if xev.done and (xev.warnings or xev.errors):
has_extract_errors = has_extract_errors or xev.errors
extract_results.append(xev)
display_extract_summary()
def is_package_json(location):
return (filetype.is_file(location)
and fileutils.file_name(location).lower() == 'package.json')
print('Missing layer VERSION for:', layer_dir)
if not LAYER_JSON_FILE in files:
print('Missing layer json for:', layer_dir)
return Layer(layer_id=layer_id)
# load data
with open(join(layer_dir, LAYER_JSON_FILE)) as layer_json:
layer_data = json.load(layer_json, object_pairs_hook=OrderedDict)
# Note: it is possible to have an EMPTY layer.tar that is a link to another
# non-empty layer.tar
if LAYER_TAR_FILE in files:
layer_tar = join(layer_dir, LAYER_TAR_FILE)
layer_digest = sha256_digest(layer_tar)
layer_size = filetype.get_size(layer_tar)
else:
layer_digest = None
layer_size = 0
# do not rely on this
if 'Size' in layer_data:
del layer_data['Size']
# do not rely on this
if 'id' in layer_data:
lid = layer_data.pop('id')
# make some basic checks
assert layer_id == lid
ccnf = layer_data.pop('container_config', {})