Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_annotated_async_from_coro(dut):
"""
Test that normal coroutines are able to call async functions annotated
with `@cocotb.coroutine`
"""
v = yield produce.async_annotated(Value(1))
assert v == 1
try:
yield produce.async_annotated(Error(SomeException))
except SomeException:
pass
else:
assert False
async def test_annotated_async_from_async(dut):
""" Test that async coroutines are able to call raw async functions """
v = await produce.async_(Value(1))
assert v == 1
try:
await produce.async_(Error(SomeException))
except SomeException:
pass
else:
assert False
def add_test(self, test_coro):
"""Called by the regression manager to queue the next test"""
if self._test is not None:
raise InternalError("Test was added while another was in progress")
self._test = test_coro
self._resume_coro_upon(
test_coro,
NullTrigger(name="Start {!s}".format(test_coro), outcome=outcomes.Value(None))
)
Args:
outcome: The :any:`outcomes.Outcome` object to resume with.
Returns:
The object yielded from the coroutine
Raises:
CoroutineComplete: If the coroutine returns or throws an error, self._outcome is set, and
:exc:`CoroutineComplete` is thrown.
"""
try:
self._started = True
return outcome.send(self._coro)
except ReturnValue as e:
self._outcome = outcomes.Value(e.retval)
raise CoroutineComplete()
except StopIteration as e:
self._outcome = outcomes.Value(e.value)
raise CoroutineComplete()
except BaseException as e:
self._outcome = outcomes.Error(remove_traceback_frames(e, ['_advance', 'send']))
raise CoroutineComplete()
def schedule(self, coroutine, trigger=None):
"""Schedule a coroutine by calling the send method.
Args:
coroutine (cocotb.decorators.coroutine): The coroutine to schedule.
trigger (cocotb.triggers.Trigger): The trigger that caused this
coroutine to be scheduled.
"""
if trigger is None:
send_outcome = outcomes.Value(None)
else:
send_outcome = trigger._outcome
if _debug:
self.log.debug("Scheduling with {}".format(send_outcome))
coro_completed = False
try:
result = coroutine._advance(send_outcome)
if _debug:
self.log.debug("Coroutine %s yielded %s (mode %d)" %
(coroutine.__qualname__, str(result), self._mode))
except cocotb.decorators.CoroutineComplete:
if _debug:
self.log.debug("Coroutine {} completed with {}".format(
coroutine, coroutine._outcome
Returns:
The object yielded from the coroutine
Raises:
CoroutineComplete: If the coroutine returns or throws an error, self._outcome is set, and
:exc:`CoroutineComplete` is thrown.
"""
try:
self._started = True
return outcome.send(self._coro)
except ReturnValue as e:
self._outcome = outcomes.Value(e.retval)
raise CoroutineComplete()
except StopIteration as e:
self._outcome = outcomes.Value(e.value)
raise CoroutineComplete()
except BaseException as e:
self._outcome = outcomes.Error(remove_traceback_frames(e, ['_advance', 'send']))
raise CoroutineComplete()
async def _wait_callback(trigger, callback):
"""
Wait for a trigger, and call `callback` with the outcome of the await.
"""
trigger = cocotb.scheduler._trigger_from_any(trigger)
try:
ret = outcomes.Value(await trigger)
except BaseException as exc:
# hide this from the traceback
ret = outcomes.Error(remove_traceback_frames(exc, ['_wait_callback']))
callback(ret)
def capture(fn, *args, **kwargs):
""" Obtain an `Outcome` representing the result of a function call """
try:
return Value(fn(*args, **kwargs))
except BaseException as e:
e = remove_traceback_frames(e, ['capture'])
return Error(e)
def kill(self):
"""Kill a coroutine."""
if self._outcome is not None:
# already finished, nothing to kill
return
if _debug:
self.log.debug("kill() called on coroutine")
# todo: probably better to throw an exception for anyone waiting on the coroutine
self._outcome = outcomes.Value(None)
cocotb.scheduler.unschedule(self)