Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
blivet_flags.discard_new = True
blivet_flags.selinux = conf.security.selinux
blivet_flags.dmraid = conf.storage.dmraid
blivet_flags.ibft = conf.storage.ibft
blivet_flags.multipath_friendly_names = conf.storage.multipath_friendly_names
blivet_flags.allow_imperfect_devices = conf.storage.allow_imperfect_devices
# Platform class setup depends on flags, re-initialize it.
platform.update_from_flags()
# Load plugins.
if arch.is_s390():
load_plugin_s390()
# Set the blacklist.
udev.device_name_blacklist = [r'^mtd', r'^mmcblk.+boot', r'^mmcblk.+rpmb', r'^zram', '^ndblk']
# We need this so all the /dev/disk/* stuff is set up.
udev.trigger(subsystem="block", action="change")
# Set the flags.
blivet_flags.auto_dev_updates = True
blivet_flags.selinux_reset_fcon = True
blivet_flags.keep_empty_ext_partitions = False
blivet_flags.discard_new = True
blivet_flags.selinux = conf.security.selinux
blivet_flags.dmraid = conf.storage.dmraid
blivet_flags.ibft = conf.storage.ibft
blivet_flags.multipath_friendly_names = conf.storage.multipath_friendly_names
blivet_flags.allow_imperfect_devices = conf.storage.allow_imperfect_devices
# Platform class setup depends on flags, re-initialize it.
platform.update_from_flags()
# Load plugins.
if arch.is_s390():
load_plugin_s390()
# Set the blacklist.
udev.device_name_blacklist = [r'^mtd', r'^mmcblk.+boot', r'^mmcblk.+rpmb', r'^zram', '^ndblk']
# We need this so all the /dev/disk/* stuff is set up.
udev.trigger(subsystem="block", action="change")
"""A sequence of tests of filesystem labeling.
* create the filesystem when passing an invalid label
* raise an exception when reading the filesystem
* raise an exception when relabeling the filesystem
"""
an_fs = self._fs_class(device=self.loop_devices[0], label=self._invalid_label)
if an_fs._readlabel.availability_errors or not an_fs.relabels():
self.skipTest("can not read or write label for filesystem %s" % an_fs.name)
self.assertIsNone(an_fs.create())
with self.assertRaises(FSReadLabelError):
an_fs.read_label()
an_fs.label = "an fs"
with self.assertRaises(FSError):
an_fs.write_label()
def test_set_invalid_uuid_later(self):
"""Create the filesystem and try to reassign an invalid UUID later."""
an_fs = self._fs_class(device=self.loop_devices[0])
if an_fs._writeuuid.availability_errors:
self.skipTest("can not write UUID for filesystem %s" % an_fs.name)
self.assertIsNone(an_fs.create())
an_fs.uuid = self._invalid_uuid
with six.assertRaisesRegex(self, FSError, "bad UUID format"):
an_fs.write_uuid()
self.assertEqual(an_fs.read_label(), an_fs._labelfs.default_label)
an_fs.label = "an_fs"
self.assertIsNone(an_fs.write_label())
self.assertEqual(an_fs.read_label(), an_fs.label)
an_fs.label = ""
self.assertIsNone(an_fs.write_label())
self.assertEqual(an_fs.read_label(), an_fs.label)
an_fs.label = None
with six.assertRaisesRegex(self, FSError, "default label"):
an_fs.write_label()
an_fs.label = "n" * 129
with six.assertRaisesRegex(self, FSError, "bad label format"):
an_fs.write_label()
values.append(['connection', 'id', devname, 's'])
values.append(['connection', 'interface-name', devname, 's'])
dev_spec = None
# type "802-3-ethernet"
else:
mac = _bound_hwaddr_of_device(devname)
if mac:
mac = [int(b, 16) for b in mac.split(":")]
values.append(['802-3-ethernet', 'mac-address', mac, 'ay'])
values.append(['connection', 'type', '802-3-ethernet', 's'])
values.append(['connection', 'id', devname, 's'])
values.append(['connection', 'interface-name', devname, 's'])
if blivet.arch.is_s390():
# Add s390 settings
s390cfg = _get_s390_settings(devname)
if s390cfg['SUBCHANNELS']:
subchannels = s390cfg['SUBCHANNELS'].split(",")
values.append(['802-3-ethernet', 's390-subchannels', subchannels, 'as'])
if s390cfg['NETTYPE']:
values.append(['802-3-ethernet', 's390-nettype', s390cfg['NETTYPE'], 's'])
if s390cfg['OPTIONS']:
opts = s390cfg['OPTIONS'].split(" ")
opts_dict = {k:v for k,v in (o.split("=") for o in opts)}
values.append(['802-3-ethernet', 's390-options', opts_dict, 'a{ss}'])
dev_spec = devname
try:
nm.nm_add_connection(values)
from . import arch
from . import util
import logging
log = logging.getLogger("blivet")
program_log = logging.getLogger("program")
log_bd_message = lambda level, msg: program_log.info(msg)
_HAVE_NVDIMM = False
# do not try to load nvdimm plugin on other architectures than x86_64
# nvdimm (both hardware and plugin) is not availble on other architectures
if arch.isX86(bits=64):
_REQUESTED_PLUGINS = BlockDev.plugin_specs_from_names(("nvdimm",))
try:
# do not check for dependencies during libblockdev initializtion, do runtime
# checks instead
BlockDev.switch_init_checks(False)
succ_, avail_plugs = BlockDev.try_reinit(require_plugins=_REQUESTED_PLUGINS,
reload=False,
log_func=log_bd_message)
except GLib.GError as err:
log.error("Failed to intialize the libblockdev library: %s", err)
else:
_HAVE_NVDIMM = "nvdimm" in avail_plugs
class NVDIMMDependencyGuard(util.DependencyGuard):
error_msg = "libblockdev NVDIMM functionality not available"
def run(self):
name = udev.device_get_name(self.data)
log_method_call(self, name=name)
slave_devices = self._devicetree._add_slave_devices(self.data)
device = None
if slave_devices:
try:
serial = self.data["DM_UUID"].split("-", 1)[1]
except (IndexError, AttributeError):
log.error("multipath device %s has no DM_UUID", name)
raise DeviceTreeError("multipath %s has no DM_UUID" % name)
device = MultipathDevice(name, parents=slave_devices,
sysfs_path=udev.device_get_sysfs_path(self.data),
serial=serial)
self._devicetree._add_device(device)
def time_initialize(timezone, storage, bootloader):
"""
Try to guess if RTC uses UTC time or not, set timezone.isUtc properly and
set system time from RTC using the UTC guess.
Guess is done by searching for bootable ntfs devices.
:param timezone: ksdata.timezone object
:param storage: blivet.Blivet instance
:param bootloader: bootloader.Bootloader instance
"""
if arch.is_s390():
# nothing to do on s390(x) were hwclock doesn't exist
return
if not timezone.isUtc and not flags.automatedInstall:
# if set in the kickstart, no magic needed here
threadMgr.wait(THREAD_STORAGE)
ntfs_devs = filter(lambda dev: dev.format.name == "ntfs",
storage.devices)
timezone.isUtc = not bootloader.has_windows(ntfs_devs)
cmd = "hwclock"
args = ["--hctosys"]
if timezone.isUtc:
args.append("--utc")
else:
:return: a tuple of a mount dict and swap list
"""
mounts = {}
swaps = []
path = "%s/etc/fstab" % chroot
if not os.access(path, os.R_OK):
# XXX should we raise an exception instead?
log.info("cannot open %s for read", path)
return mounts, swaps
blkid_tab = BlkidTab(chroot=chroot)
try:
blkid_tab.parse()
log.debug("blkid.tab devs: %s", list(blkid_tab.devices.keys()))
except Exception: # pylint: disable=broad-except
log_exception_info(log.info, "error parsing blkid.tab")
blkid_tab = None
crypt_tab = CryptTab(devicetree, blkid_tab=blkid_tab, chroot=chroot)
try:
crypt_tab.parse(chroot=chroot)
log.debug("crypttab maps: %s", list(crypt_tab.mappings.keys()))
except Exception: # pylint: disable=broad-except
log_exception_info(log.info, "error parsing crypttab")
crypt_tab = None
with open(path) as f:
log.debug("parsing %s", path)
for line in f.readlines():
(line, _pound, _comment) = line.partition("#")
fields = line.split(None, 4)