Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'''Test asizeof.asized()
'''
self.assertEqual(list(asizeof.asized(detail=2)), [])
self.assertRaises(KeyError, asizeof.asized, **{'all': True})
sized = asizeof.asized(Foo(42), detail=2)
self.assertEqual(sized.name, 'Foo')
refs = [ref for ref in sized.refs if ref.name == '__dict__']
self.assertEqual(len(refs), 1)
self.assertEqual(refs[0], sized.get('__dict__'))
refs = [ref for ref in refs[0].refs if ref.name == '[V] data: 42']
self.assertEqual(len(refs), 1, refs)
i = 42
self.assertEqual(refs[0].size, asizeof.asizeof(i), refs[0].size)
# Size multiple objects
sizer = asizeof.Asizer()
sized_objs = sizer.asized(Foo(3), Foo(4), detail=2)
self.assertEqual(len(sized_objs), 2)
def test_asizer(self):
'''Test Asizer properties.
'''
sizer = asizeof.Asizer()
obj = 'unladen swallow'
mutable = [obj]
sizer.asizeof(obj)
self.assertEqual(sizer.total, asizeof.asizeof(obj))
sizer.asizeof(mutable, mutable)
self.assertEqual(sizer.duplicate, 2) # obj seen 3x!
self.assertEqual(sizer.total, asizeof.asizeof(obj, mutable))
def test_exclude_types(self):
'''Test Asizer.exclude_types().
'''
sizer = asizeof.Asizer()
sizer.exclude_types(Foo)
self.assertEqual(sizer.asizeof(Foo('ignored')), 0)
def _annotate_objects(self):
"""
Extract meta-data describing the stored objects.
"""
self.metadata = []
sizer = Asizer()
sizes = sizer.asizesof(*self.objects)
self.total_size = sizer.total
for obj, sz in zip(self.objects, sizes):
md = _MetaObject()
md.size = sz
md.id = id(obj)
try:
md.type = obj.__class__.__name__
except (AttributeError, ReferenceError): # pragma: no cover
md.type = type(obj).__name__
md.str = safe_repr(obj, clip=128)
self.metadata.append(md)
operating system. The overhead of the Heapmonitor structure is also
computed.
"""
ts = _get_time()
try:
# Snapshots can be taken asynchronously. Prevent race conditions when
# two snapshots are taken at the same time. TODO: It is not clear what
# happens when memory is allocated/released while this function is
# executed but it will likely lead to inconsistencies. Either pause all
# other threads or don't size individual objects in asynchronous mode.
if _snapshot_lock is not None:
_snapshot_lock.acquire()
sizer = asizeof.Asizer()
objs = [to.ref() for to in list(tracked_objects.values())]
sizer.exclude_refs(*objs)
# The objects need to be sized in a deterministic order. Sort the
# objects by its creation date which should at least work for non-parallel
# execution. The "proper" fix would be to handle shared data separately.
tos = list(tracked_objects.values())
#sorttime = lambda i, j: (i.birth < j.birth) and -1 or (i.birth > j.birth) and 1 or 0
#tos.sort(sorttime)
tos.sort(key=lambda x: x.birth)
for to in tos:
to.track_size(ts, sizer)
fp = Footprint()
fp.timestamp = ts
def find_garbage(sizer=None, graphfile=None, prune=1):
"""
Let the garbage collector identify ref cycles.
First, the garbage collector runs and saves the garbage into gc.garbage. The
leafs of the reference graph will be pruned to only include objects directly
involved in actual cycles. The remaining garbage elements will be sized
(which will include the pruned leaf sizes) and annotated. If a graphfile is
passed and garbage was detected, the garbage will be visualized in graphviz
format.
The total number of garbage and the annotated cycle elements are returned.
"""
if not sizer:
sizer = asizeof.Asizer()
gc.set_debug(gc.DEBUG_SAVEALL)
gc.collect()
total = len(gc.garbage)
cnt = 0
cycles = gc.garbage[:]
if prune:
while cnt != len(cycles):
cnt = len(cycles)
cycles = eliminate_leafs(cycles)
edges = get_edges(cycles)
garbage = []
def visualize_ref_cycles(fname):
"""
Print reference cycles of collectable garbage to a file which can be
processed by Graphviz.
This function collects the reported garbage. Therefore, subsequent
invocations of `print_garbage_stats` will not report the same objects again.
"""
fobj = open(fname, 'w')
sizer = asizeof.Asizer()
total, garbage = find_garbage(sizer, fobj)
fobj.close()
def print_garbage_stats(fobj=sys.stdout):
"""
Print statistics related to garbage/leaks.
This function collects the reported garbage. Therefore, subsequent
invocations of `print_garbage_stats` will not report the same objects again.
"""
sizer = asizeof.Asizer()
total, garbage = find_garbage(sizer)
sz = sizer.total
cnt = len(garbage)
if cnt:
_log_garbage(garbage, fobj)
fobj.write('Garbage: %8d collected objects (%6d in cycles): %12s\n' % (total, cnt, _pp(sz)))
def _annotate_objects(self):
"""
Extract meta-data describing the stored objects.
"""
self.metadata = []
sizer = Asizer()
sizes = sizer.asizesof(*self.objects)
self.total_size = sizer.total
for obj, sz in zip(self.objects, sizes):
md = _MetaObject()
md.size = sz
md.id = id(obj)
try:
md.type = obj.__class__.__name__
except (AttributeError, ReferenceError): # pragma: no cover
md.type = type(obj).__name__
md.str = safe_repr(obj, clip=128)
self.metadata.append(md)
The overhead of the `ClassTracker` structure is also computed.
Snapshots can be taken asynchronously. The function is protected with a
lock to prevent race conditions.
"""
try:
# TODO: It is not clear what happens when memory is allocated or
# released while this function is executed but it will likely lead
# to inconsistencies. Either pause all other threads or don't size
# individual objects in asynchronous mode.
self.snapshot_lock.acquire()
timestamp = _get_time()
sizer = asizeof.Asizer()
objs = [tobj.ref() for tobj in list(self.objects.values())]
sizer.exclude_refs(*objs)
# The objects need to be sized in a deterministic order. Sort the
# objects by its creation date which should at least work for non-parallel
# execution. The "proper" fix would be to handle shared data separately.
tracked_objects = list(self.objects.values())
tracked_objects.sort(key=lambda x: x.birth)
for tobj in tracked_objects:
tobj.track_size(timestamp, sizer)
snapshot = Snapshot()
snapshot.timestamp = timestamp
snapshot.tracked_total = sizer.total
if compute_total: