Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_itemsize(self):
'''Test asizeof.itemsize()
'''
objects = [1, True, None, ()]
for o in objects:
self.assertEqual(asizeof.itemsize(o), type(o).__itemsize__)
itemsizes = [({}, asizeof._sizeof_CPyDictEntry),
(set(), asizeof._sizeof_Csetentry),
]
for o, itemsize in itemsizes:
self.assertEqual(asizeof.itemsize(o), itemsize)
def test_typedefs(self): # remove?
'''Test showing all basic _typedefs'''
t = len(asizeof._typedefs)
w = len(str(t)) * ' '
self._printf('%s%d type definitions: basic- and itemsize (leng), kind ... %s', os.linesep, t, '-type[def]s')
for k, v in sorted((asizeof._prepr(k), v) for k, v in asizeof._items(asizeof._typedefs)):
s = '%(base)s and %(item)s%(leng)s, %(kind)s%(code)s' % v.format()
self._printf('%s %s: %s', w, k, s)
'''Test int and long examples'''
try:
_L5d = long(1) << 64
_L17d = long(1) << 256
t = '/'
except NameError:
_L5d = 1 << 64
_L17d = 1 << 256
t = ''
self._printf('%sasizeof(%s, align=%s, limit=%s) ... %s', os.linesep, t, 0, 0, '-int')
for o in (1024, 1000000000,
1.0, 1.0e100, 1024, 1000000000,
self.MAX, 1 << 32, _L5d, -_L5d, _L17d, -_L17d):
self._printf(" asizeof(%s) is %s (%s + %s * %s)", _repr(o), asizeof.asizeof(o, align=0, limit=0),
asizeof.basicsize(o), asizeof.leng(o), asizeof.itemsize(o))
def test_iterator(self):
'''Test iterator examples'''
self._printf('%sasizeof(%s, code=%s) ... %s', os.linesep, '', False, '-iter[ator]')
o = iter('0123456789')
e = iter('')
d = iter({})
i = iter(asizeof._items({1:1}))
k = iter(asizeof._keys({2:2, 3:3}))
v = iter(asizeof._values({4:4, 5:5, 6:6}))
l = iter([])
t = iter(())
asizeof.asizesof(o, e, d, i, k, v, l, t, limit=0, code=False, stats=1)
asizeof.asizesof(o, e, d, i, k, v, l, t, limit=9, code=False, stats=1)
def fit(self, X, y=None, **fit_params):
self.original_df_mem = X.memory_usage(deep=True).sum()
self.blank_encoder_mem = asizeof.asizeof(self.encoder)
start_time = time.time()
self.fit_peak_mem = memory_usage(proc=(self.encoder.fit, (X, y)), max_usage=True)[0]
self.fit_encoder_time = time.time() - start_time
self.trained_encoder_mem = asizeof.asizeof(self.encoder)
return self
def set_object(self, value):
if value is not None:
object_hash = self.compute_hash(value)
self.object_property = value
self._hash = object_hash
self.file_name = object_hash
self.byte_size = asizeof.asizeof(value)
def get_license_index(rules=None):
"""
Return a LicenseIndex built from a list of rules.
"""
if not rules:
rules = get_all_rules()
if DEBUG_PERF:
from pympler import asizeof # @UnresolvedImport
print('Memory size of rules:', asizeof.asizeof(rules))
idx = LicenseIndex(rules)
if DEBUG_PERF:
print('Memory size of index:', asizeof.asizeof(idx))
return idx
def get_obj_referents(oid):
referents = {}
obj = get_obj(oid)
if type(obj) is dict:
named_objects = asizeof.named_refs(obj)
else:
refs = asizeof._getreferents(obj)
named_objects = [(repr(type(x)), x) for x in refs]
for name, o in named_objects:
referents[name] = (get_ref(o), type(o).__name__,
safe_repr(o, clip=48), asizeof.asizeof(o))
return dict(referents=referents)
The overhead of the `ClassTracker` structure is also computed.
Snapshots can be taken asynchronously. The function is protected with a
lock to prevent race conditions.
"""
try:
# TODO: It is not clear what happens when memory is allocated or
# released while this function is executed but it will likely lead
# to inconsistencies. Either pause all other threads or don't size
# individual objects in asynchronous mode.
self.snapshot_lock.acquire()
timestamp = _get_time()
sizer = asizeof.Asizer()
objs = [tobj.ref() for tobj in list(self.objects.values())]
sizer.exclude_refs(*objs)
# The objects need to be sized in a deterministic order. Sort the
# objects by its creation date which should at least work for
# non-parallel execution. The "proper" fix would be to handle
# shared data separately.
tracked_objects = list(self.objects.values())
tracked_objects.sort(key=lambda x: x.birth)
for tobj in tracked_objects:
tobj.track_size(timestamp, sizer)
snapshot = Snapshot()
snapshot.timestamp = timestamp
snapshot.tracked_total = sizer.total
The overhead of the `ClassTracker` structure is also computed.
Snapshots can be taken asynchronously. The function is protected with a
lock to prevent race conditions.
"""
try:
# TODO: It is not clear what happens when memory is allocated or
# released while this function is executed but it will likely lead
# to inconsistencies. Either pause all other threads or don't size
# individual objects in asynchronous mode.
self.snapshot_lock.acquire()
timestamp = _get_time()
sizer = asizeof.Asizer()
objs = [tobj.ref() for tobj in list(self.objects.values())]
sizer.exclude_refs(*objs)
# The objects need to be sized in a deterministic order. Sort the
# objects by its creation date which should at least work for non-parallel
# execution. The "proper" fix would be to handle shared data separately.
tracked_objects = list(self.objects.values())
tracked_objects.sort(key=lambda x: x.birth)
for tobj in tracked_objects:
tobj.track_size(timestamp, sizer)
snapshot = Snapshot()
snapshot.timestamp = timestamp
snapshot.tracked_total = sizer.total
if compute_total: