Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
blivet_flags.discard_new = True
blivet_flags.selinux = conf.security.selinux
blivet_flags.dmraid = conf.storage.dmraid
blivet_flags.ibft = conf.storage.ibft
blivet_flags.multipath_friendly_names = conf.storage.multipath_friendly_names
blivet_flags.allow_imperfect_devices = conf.storage.allow_imperfect_devices
# Platform class setup depends on flags, re-initialize it.
platform.update_from_flags()
# Load plugins.
if arch.is_s390():
load_plugin_s390()
# Set the blacklist.
udev.device_name_blacklist = [r'^mtd', r'^mmcblk.+boot', r'^mmcblk.+rpmb', r'^zram', '^ndblk']
# We need this so all the /dev/disk/* stuff is set up.
udev.trigger(subsystem="block", action="change")
def addUdevDiskDevice(self, info):
name = udev.device_get_name(info)
log_method_call(self, name=name)
sysfs_path = udev.device_get_sysfs_path(info)
serial = udev.device_get_serial(info)
bus = udev.device_get_bus(info)
vendor = util.get_sysfs_attr(sysfs_path, "device/vendor")
model = util.get_sysfs_attr(sysfs_path, "device/model")
kwargs = { "serial": serial, "vendor": vendor, "model": model, "bus": bus }
if udev.device_is_iscsi(info) and not self._cleanup:
diskType = iScsiDiskDevice
initiator = udev.device_get_iscsi_initiator(info)
target = udev.device_get_iscsi_name(info)
address = udev.device_get_iscsi_address(info)
port = udev.device_get_iscsi_port(info)
nic = udev.device_get_iscsi_nic(info)
kwargs["initiator"] = initiator
if initiator == self.iscsi.initiator:
node = self.iscsi.getNode(target, address, port, nic)
kwargs["node"] = node
kwargs["ibft"] = node in self.iscsi.ibftNodes
kwargs["nic"] = self.iscsi.ifaces.get(node.iface, node.iface)
log.info("%s is an iscsi disk", name)
else:
# qla4xxx partial offload
kwargs["node"] = None
kwargs["ibft"] = False
kwargs["nic"] = "offload:not_accessible_via_iscsiadm"
kwargs["fw_address"] = address
kwargs["fw_port"] = port
def _get_kwargs(self):
kwargs = super()._get_kwargs()
kwargs["uuid"] = udev.device_get_disklabel_uuid(self.data)
return kwargs
def _get_kwargs(self):
""" Return a kwargs dict to pass to DeviceFormat constructor. """
kwargs = {"uuid": udev.device_get_uuid(self.data),
"label": udev.device_get_label(self.data),
"device": self.device.path,
"serial": udev.device_get_serial(self.data),
"exists": True}
return kwargs
kwargs["parents"] = [container]
kwargs["level"] = udev.device_get_md_level(info)
kwargs["memberDevices"] = udev.device_get_md_devices(info)
kwargs["uuid"] = udev.device_get_md_uuid(info)
kwargs["exists"] = True
del kwargs["model"]
del kwargs["serial"]
del kwargs["vendor"]
del kwargs["bus"]
elif udev.device_is_dasd(info) and not self._cleanup:
diskType = DASDDevice
kwargs["busid"] = udev.device_get_dasd_bus_id(info)
kwargs["opts"] = {}
for attr in ['readonly', 'use_diag', 'erplog', 'failfast']:
kwargs["opts"][attr] = udev.device_get_dasd_flag(info, attr)
log.info("%s is a dasd device", name)
elif udev.device_is_zfcp(info):
diskType = ZFCPDiskDevice
for attr in ['hba_id', 'wwpn', 'fcp_lun']:
kwargs[attr] = udev.device_get_zfcp_attribute(info, attr=attr)
log.info("%s is a zfcp device", name)
else:
diskType = DiskDevice
log.info("%s is a disk", name)
device = diskType(name,
major=udev.device_get_major(info),
minor=udev.device_get_minor(info),
:param bool refresh_udev_cache: governs if the udev device cache should be refreshed
:returns: True if the device name corresponds to a disk, False if not
:rtype: bool
"""
if devicetree is None:
global udev_device_dict_cache
if device_name:
if udev_device_dict_cache is None or refresh_udev_cache:
# Lazy load the udev dick that contains the {device_name : udev_device,..,}
# mappings. The operation could be quite costly due to udev_settle() calls,
# so we cache it in this non-elegant way.
# An unfortunate side effect of this is that udev devices that show up after
# this function is called for the first time will not be taken into account.
udev_device_dict_cache = {udev.device_get_name(d): d for d in udev.get_devices()}
udev_device = udev_device_dict_cache.get(device_name)
return udev_device and udev.device_is_realdisk(udev_device)
else:
return False
else:
device = devicetree.get_device_by_name(device_name)
return device and device.is_disk
def _get_kwargs(self):
kwargs = super(MDFormatPopulator, self)._get_kwargs()
try:
# ID_FS_UUID contains the array UUID
kwargs["md_uuid"] = udev.device_get_uuid(self.data)
except KeyError:
log.warning("mdraid member %s has no md uuid", udev.device_get_name(self.data))
# reset the uuid to the member-specific value
# this will be None for members of v0 metadata arrays
kwargs["uuid"] = udev.device_get_md_device_uuid(self.data)
kwargs["biosraid"] = udev.device_is_biosraid_member(self.data)
return kwargs
def deviceMatches(spec):
full_spec = spec
if not full_spec.startswith("/dev/"):
full_spec = os.path.normpath("/dev/" + full_spec)
# the regular case
matches = udev.udev_resolve_glob(full_spec)
dev = udev.udev_resolve_devspec(full_spec)
# udev_resolve_devspec returns None if there's no match, but we don't
# want that ending up in the list.
if dev and dev not in matches:
matches.append(dev)
return matches