Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
name = udev.device_get_name(self.data)
log_method_call(self, name=name)
slave_devices = self._devicetree._add_slave_devices(self.data)
device = None
if slave_devices:
try:
serial = self.data["DM_UUID"].split("-", 1)[1]
except (IndexError, AttributeError):
log.error("multipath device %s has no DM_UUID", name)
raise DeviceTreeError("multipath %s has no DM_UUID" % name)
device = MultipathDevice(name, parents=slave_devices,
sysfs_path=udev.device_get_sysfs_path(self.data),
serial=serial)
self._devicetree._add_device(device)
# likelihood blivet thinks the device is not active and therefore has no
# sysfs path.
# XXX We need an existing device here, so we should also be checking destroy actions.
device = self.get_device_by_name(udev.device_get_name(event.info), hidden=True)
if device is None:
device = self.get_device_by_uuid(event.info.get("UUID_SUB", udev.device_get_uuid(event.info)),
hidden=True)
if device is None and udev.device_is_dm_luks(event.info):
# Special case for first-time decrypted LUKS devices since we do not add a
# device for the decrypted/mapped device until it has been opened.
self.handle_device(event.info)
device = self.get_device_by_name(udev.device_get_name(event.info), hidden=True)
# XXX Don't change anything (except simple attributes?) if actions are being executed.
if device is None and self._event_device_is_physical_disk(event):
log.info("disk %s was added", udev.device_get_name(event.info))
self.handle_device(event.info)
elif device is not None and device.exists:
log.info("device %s was activated", device.name)
# device was activated from outside, so update the sysfs path
sysfs_path = udev.device_get_sysfs_path(event.info)
if device.sysfs_path != sysfs_path:
old_sysfs_path = device.sysfs_path
device.sysfs_path = sysfs_path
callbacks.attribute_changed(device=device, attr="sysfs_path",
old=old_sysfs_path, new=sysfs_path)
:rtype: bool
"""
if devicetree is None:
global _udev_device_dict_cache
if device_name:
if _udev_device_dict_cache is None or refresh_udev_cache:
# Lazy load the udev dick that contains the {device_name : udev_device,..,}
# mappings. The operation could be quite costly due to udev_settle() calls,
# so we cache it in this non-elegant way.
# An unfortunate side effect of this is that udev devices that show up after
# this function is called for the first time will not be taken into account.
_udev_device_dict_cache = dict()
for d in udev.get_devices():
# Add the device name to the cache.
_udev_device_dict_cache[udev.device_get_name(d)] = d
# If the device is md, add the md name as well.
if udev.device_is_md(d) and udev.device_get_md_name(d):
_udev_device_dict_cache[udev.device_get_md_name(d)] = d
udev_device = _udev_device_dict_cache.get(device_name)
return udev_device and udev.device_is_disk(udev_device)
else:
return False
else:
device = devicetree.get_device_by_name(device_name)
return device and device.is_disk
def handleUdevDMRaidMemberFormat(self, info, device):
# if dmraid usage is disabled skip any dmraid set activation
if not flags.dmraid:
return
log_method_call(self, name=device.name, type=device.format.type)
name = udev.device_get_name(info)
uuid = udev.device_get_uuid(info)
major = udev.device_get_major(info)
minor = udev.device_get_minor(info)
# Have we already created the DMRaidArrayDevice?
rs_names = blockdev.dm.get_member_raid_sets(uuid, name, major, minor)
if len(rs_names) == 0:
log.warning("dmraid member %s does not appear to belong to any "
"array", device.name)
return
for rs_name in rs_names:
dm_array = self.getDeviceByName(rs_name, incomplete=True)
if dm_array is not None:
# We add the new device.
dm_array.parents.append(device)
def addUdevDMDevice(self, info):
name = udev.device_get_name(info)
log_method_call(self, name=name)
sysfs_path = udev.device_get_sysfs_path(info)
slave_devices = self._addSlaveDevices(info)
device = self.getDeviceByName(name)
# if this is a luks device whose map name is not what we expect,
# fix up the map name and see if that sorts us out
handle_luks = (udev.device_is_dm_luks(info) and
(self._cleanup or not flags.installer_mode))
if device is None and handle_luks and slave_devices:
slave_dev = slave_devices[0]
slave_dev.format.mapName = name
slave_info = udev.get_device(slave_dev.sysfsPath)
self.handleUdevLUKSFormat(slave_info, slave_dev)
# try once more to get the device
def run(self):
parents = self._devicetree._add_slave_devices(self.data)
device = IntegrityDevice(udev.device_get_name(self.data),
sysfs_path=udev.device_get_sysfs_path(self.data),
parents=parents,
exists=True)
self._devicetree._add_device(device)
return device
def _create_event(self, *args, **kwargs):
return Event(args[0].action, udev.device_get_name(args[0]), args[0])
def addUdevDiskDevice(self, info):
name = udev.device_get_name(info)
log_method_call(self, name=name)
sysfs_path = udev.device_get_sysfs_path(info)
serial = udev.device_get_serial(info)
bus = udev.device_get_bus(info)
vendor = util.get_sysfs_attr(sysfs_path, "device/vendor")
model = util.get_sysfs_attr(sysfs_path, "device/model")
kwargs = { "serial": serial, "vendor": vendor, "model": model, "bus": bus }
if udev.device_is_iscsi(info) and not self._cleanup:
diskType = iScsiDiskDevice
initiator = udev.device_get_iscsi_initiator(info)
target = udev.device_get_iscsi_name(info)
address = udev.device_get_iscsi_address(info)
port = udev.device_get_iscsi_port(info)
nic = udev.device_get_iscsi_nic(info)
def run(self):
parents = self._devicetree._add_slave_devices(self.data)
device = LUKSDevice(udev.device_get_name(self.data),
sysfs_path=udev.device_get_sysfs_path(self.data),
parents=parents,
exists=True)
self._devicetree._add_device(device)
return device