Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def list_for_each_spl_kmem_cache() -> Iterable[drgn.Object]:
yield from list_for_each_entry(
"spl_kmem_cache_t",
sdb.get_object("spl_kmem_cache_list").address_of_(), "skc_list")
def for_each_child_cache(root_cache: drgn.Object) -> Iterable[drgn.Object]:
assert root_cache.type_.type_name() == 'struct kmem_cache *'
yield from list_for_each_entry(
"struct kmem_cache", root_cache.memcg_params.children.address_of_(),
"memcg_params.children_node")
def _for_each_block_device(prog):
try:
class_in_private = prog.cache['knode_class_in_device_private']
except KeyError:
# We need a proper has_member(), but this is fine for now.
class_in_private = any(member[1] == 'knode_class' for member in
prog.type('struct device_private').members)
prog.cache['knode_class_in_device_private'] = class_in_private
devices = prog['block_class'].p.klist_devices.k_list.address_of_()
if class_in_private:
for device_private in list_for_each_entry('struct device_private', devices,
'knode_class.n_node'):
yield device_private.device
else:
yield from list_for_each_entry('struct device', devices,
'knode_class.n_node')
def for_each_partial_slab_in_cache(cache: drgn.Object) -> Iterable[drgn.Object]:
assert cache.type_.type_name() == 'struct kmem_cache *'
node = cache.node[0].partial # assumption nr_node_ids == 0
yield from list_for_each_entry("struct page", node.address_of_(), "lru")
if is_root_cache(cache):
for child in for_each_child_cache(cache):
yield from for_each_partial_slab_in_cache(child)
def _for_each_block_device(prog):
try:
class_in_private = prog.cache['knode_class_in_device_private']
except KeyError:
# We need a proper has_member(), but this is fine for now.
class_in_private = any(member[1] == 'knode_class' for member in
prog.type('struct device_private').members)
prog.cache['knode_class_in_device_private'] = class_in_private
devices = prog['block_class'].p.klist_devices.k_list.address_of_()
if class_in_private:
for device_private in list_for_each_entry('struct device_private', devices,
'knode_class.n_node'):
yield device_private.device
else:
yield from list_for_each_entry('struct device', devices,
'knode_class.n_node')
"""An implementation of lsmod(8) using drgn"""
from drgn.helpers.linux.list import list_for_each_entry
print('Module Size Used by')
for mod in list_for_each_entry('struct module', prog['modules'].address_of_(),
'list'):
name = mod.name.string_().decode()
size = (mod.init_layout.size + mod.core_layout.size).value_()
refcnt = mod.refcnt.counter.value_() - 1
print(f'{name:19} {size:>8} {refcnt}', end='')
first = True
for use in list_for_each_entry('struct module_use',
mod.source_list.address_of_(),
'source_list'):
if first:
print(' ', end='')
first = False
else:
print(',', end='')
print(use.source.name.string_().decode(), end='')
print()
def _call(self, objs: Iterable[drgn.Object]) -> Iterable[drgn.Object]:
sname = get_valid_struct_name(self, self.args.struct_name)
for obj in objs:
try:
yield from list_for_each_entry(sname, obj, self.args.member)
except LookupError as err:
raise sdb.CommandError(self.name, str(err))