Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_tar_flag_from_algo(self):
assert tar_builders.get_tar_flag_from_algo('gzip') == '-z'
assert tar_builders.get_tar_flag_from_algo('bzip2') == '-j'
if not utils.is_bsd():
assert tar_builders.get_tar_flag_from_algo('xz') == '-J'
def test_pass(self):
bunch = utils.Bunch()
validator.validate(bunch)
freezer_command = '{0} {1}'.format(backup_args.trickle_command,
' '.join(sys.argv))
LOG.debug('Trickle command: {0}'.format(freezer_command))
process = subprocess.Popen(freezer_command.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ.copy())
while process.poll() is None:
line = process.stdout.readline().strip()
if line != '':
print(line)
output, error = process.communicate()
if hasattr(backup_args, 'tmp_file'):
utils.delete_file(backup_args.tmp_file)
if process.returncode:
LOG.warning("Trickle Error: {0}".format(error))
LOG.info("Switching to work without trickle ...")
return run_job(backup_args, storage)
else:
return run_job(backup_args, storage)
def get_lvm_info(path):
"""
Take a file system path as argument as backup_opt_dict.path_to_backup
and return a list containing lvm_srcvol, lvm_volgroup
where the path is mounted on.
:param path: the original file system path where backup needs
to be executed
:returns: a dict containing the keys 'volgroup', 'srcvol' and 'snap_path'
"""
mount_point_path, snap_path = utils.get_mount_from_path(path)
with open('/proc/mounts', 'r') as mount_fd:
mount_points = mount_fd.readlines()
lvm_volgroup, lvm_srcvol, lvm_device = lvm_guess(
mount_point_path, mount_points, '/proc/mounts')
if not lvm_device:
mount_process = subprocess.Popen(
['mount'], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=os.environ)
mount_out, mount_err = mount_process.communicate()
mount_points = mount_out.split('\n')
lvm_volgroup, lvm_srcvol, lvm_device = lvm_guess(
mount_point_path, mount_points, 'mount')
if not lvm_device:
def get_tar_flag_from_algo(compression):
algo = {
'gzip': '-z',
'bzip2': '-j',
'xz': '-J',
}
compression_exec = utils.get_executable_path(compression)
if not compression_exec:
raise Exception("Critical Error: {0} executable not found ".
format(compression))
return algo.get(compression)
def get_files_hashes_in_path(self):
"""
Walk the files in path computing the checksum for each one and updates
the concatenation checksum for the final result
"""
self.count = utils.walk_path(self.path, self.exclude,
self.ignorelinks, self.get_hash)
return self._increment_hash
def metadata_path(self, engine, hostname_backup_name):
return utils.path_join(self.storage_path, "metadata", engine.name,
hostname_backup_name)