Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def stop(self):
self.status = Thread.TERMINATED
def __init__(self, ql, status=1):
self.ql = ql
self.id = Thread.ID
Thread.ID += 1
self.context = Context(ql)
self.status = status
self.waitforthreads = []
self.tls = {}
self.tls_index = 0
def do_schedule(self):
if self.current_thread.is_stop() or self.ins_count % ThreadManager.TIME_SLICE == 0:
if len(self.threads) <= 1:
return
else:
for i in range(1, len(self.threads)):
next_id = (self.current_thread.id + i) % len(self.threads)
next_thread = self.threads[next_id]
# find next thread
if next_thread.status == Thread.RUNNING and (not next_thread.has_waitfor()):
if self.current_thread.is_stop():
pass
else:
self.current_thread.suspend()
next_thread.resume()
self.current_thread = next_thread
break