Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dealloc(self):
seen = []
g1 = greenlet(fmain)
g2 = greenlet(fmain)
g1.switch(seen)
g2.switch(seen)
self.assertEqual(seen, [])
del g1
gc.collect()
self.assertEqual(seen, [greenlet.GreenletExit])
del g2
gc.collect()
self.assertEqual(seen, [greenlet.GreenletExit, greenlet.GreenletExit])
def test_kill(self):
def f():
switch("ok")
switch("fail")
g = greenlet(f)
res = g.switch()
self.assertEqual(res, "ok")
res = g.throw()
self.assertTrue(isinstance(res, greenlet.GreenletExit))
self.assertTrue(g.dead)
res = g.throw() # immediately eaten by the already-dead greenlet
self.assertTrue(isinstance(res, greenlet.GreenletExit))
if self.delta_count > self.max_delta_count:
self._finalize()
self.events['run_end'](self)
self._finished = True
raise Exception("Maximum number of delta cycles reached: {0}".format(self.max_delta_count))
# print('-----------------------------------------')
self.events['timestep_end'](self.time, self)
# All events have settled, let's advance time
if not self._advance_time():
self._finalize()
self.events['run_end'](self)
self._finished = True
raise greenlet.GreenletExit
raise greenlet.GreenletExit
raise Exception("Maximum number of delta cycles reached: {0}".format(self.max_delta_count))
if not (self._ready_pool or self.trig_pool):
self.events['delta_settled'](self)
# print('-----------------------------------------')
self.events['timestep_end'](self.time, self)
# All events have settled, let's advance time
if not self._advance_time():
self._finalize()
self.events['run_end'](self)
self._finished = True
raise greenlet.GreenletExit