Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def addUdevDMDevice(self, info):
name = udev.device_get_name(info)
log_method_call(self, name=name)
sysfs_path = udev.device_get_sysfs_path(info)
slave_devices = self._addSlaveDevices(info)
device = self.getDeviceByName(name)
# if this is a luks device whose map name is not what we expect,
# fix up the map name and see if that sorts us out
handle_luks = (udev.device_is_dm_luks(info) and
(self._cleanup or not flags.installer_mode))
if device is None and handle_luks and slave_devices:
slave_dev = slave_devices[0]
slave_dev.format.mapName = name
slave_info = udev.get_device(slave_dev.sysfsPath)
self.handleUdevLUKSFormat(slave_info, slave_dev)
# try once more to get the device
device = self.getDeviceByName(name)
# create a device for the livecd OS image(s)
if device is None and udev.device_is_dm_livecd(info):
device = DMDevice(name, dmUuid=info.get('DM_UUID'),
sysfsPath=sysfs_path, exists=True,
parents=[slave_devices[0]])
device.protected = True
device.controllable = False
def run(self):
super(DMRaidFormatPopulator, self).run()
# if dmraid usage is disabled skip any dmraid set activation
if not flags.dmraid:
return
log_method_call(self, name=self.device.name)
name = udev.device_get_name(self.data)
uuid = udev.device_get_uuid(self.data)
major = udev.device_get_major(self.data)
minor = udev.device_get_minor(self.data)
# Have we already created the DMRaidArrayDevice?
rs_names = blockdev.dm.get_member_raid_sets(name, uuid, major, minor)
if len(rs_names) == 0:
log.warning("dmraid member %s does not appear to belong to any "
"array", self.device.name)
return
for rs_name in rs_names:
def update_from_flags(self):
if flags.gpt:
disklabel_class = get_device_format_class("disklabel")
disklabel_types = disklabel_class.get_platform_label_types()
if "gpt" not in disklabel_types:
log.warning("GPT is not a supported disklabel on this platform. Using default "
"disklabel %s instead.", disklabel_types[0])
else:
disklabel_class.set_default_label_type("gpt")
def dict(self):
d = super(DiskLabel, self).dict
if flags.testing:
return d
d.update({"label_type": self.label_type,
"partition_count": len(self.partitions),
"sector_size": self.sector_size,
"offset": self.get_alignment().offset,
"grain_size": self.get_alignment().grainSize})
return d
def enable(self):
""" Enable monitoring and handling of block device uevents. """
super(UdevEventManager, self).enable()
monitor = pyudev.Monitor.from_netlink(udev.global_udev)
monitor.filter_by("block")
self._pyudev_observer = pyudev.MonitorObserver(monitor,
self.handle_event)
self._pyudev_observer.start()
flags.uevents = True
def update_from_flags(self):
if flags.gpt:
disklabel_class = get_device_format_class("disklabel")
disklabel_types = disklabel_class.get_platform_label_types()
if "gpt" not in disklabel_types:
log.warning("GPT is not a supported disklabel on this platform. Using default "
"disklabel %s instead.", disklabel_types[0])
else:
disklabel_class.set_default_label_type("gpt")
super(LUKSFormatPopulator, self).run()
if not self.device.format.uuid:
log.info("luks device %s has no uuid", self.device.path)
return
# look up or create the mapped device
if not self._devicetree.get_device_by_name(self.device.format.map_name):
passphrase = luks_data.luks_devs.get(self.device.format.uuid)
if self.device.format.configured:
pass
elif passphrase:
self.device.format.passphrase = passphrase
elif self.device.format.uuid in luks_data.luks_devs:
log.info("skipping previously-skipped luks device %s",
self.device.name)
elif self._devicetree._cleanup or flags.testing:
# if we're only building the devicetree so that we can
# tear down all of the devices we don't need a passphrase
if self.device.format.status:
# this makes device.configured return True
self.device.format.passphrase = 'yabbadabbadoo'
else:
# Try each known passphrase. Include luks_data.luks_devs values in case a
# passphrase has been set for a specific device without a full
# reset/populate, in which case the new passphrase would not be
# in luks_data.passphrases.
passphrases = luks_data.passphrases + list(luks_data.luks_devs.values())
for passphrase in passphrases:
self.device.format.passphrase = passphrase
try:
self.device.format.setup()
except blockdev.BlockDevError:
# we're only removing partitions that don't physically exist
continue
if part.isExtended:
# these get removed last
continue
part.disk.format.partedDisk.removePartition(part.partedPartition)
part.partedPartition = None
part.disk = None
for disk in disks:
# remove empty extended so it doesn't interfere
extended = disk.format.extendedPartition
if extended and not disk.format.logicalPartitions and \
(flags.installer_mode or
extended not in (p.partedPartition for p in all_partitions)):
log.debug("removing empty extended partition from %s", disk.name)
disk.format.partedDisk.removePartition(extended)
def disable(self):
""" Disable monitoring and handling of block device uevents. """
if self._pyudev_observer:
self._pyudev_observer.stop()
self._pyudev_observer = None
flags.uevents = False
def hidden(self):
return super(MDRaidMember, self).hidden or self.biosraid
@property
def container_uuid(self):
return self.md_uuid
@container_uuid.setter
def container_uuid(self, uuid):
self.md_uuid = uuid
# nodmraid -> Wether to use BIOS RAID or not
# Note the anaconda cmdline has not been parsed yet when we're first imported,
# so we can not use flags.dmraid here
if not flags.noiswmd and flags.dmraid:
MDRaidMember._udev_types.append("isw_raid_member")
register_device_format(MDRaidMember)