Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(dut):
dut.stream_in_data.setimmediatevalue(value)
yield Timer(1)
assert dut.stream_in_data.value == 0
yield ReadOnly()
def do_test_readwrite_in_readonly(dut):
global exited
yield RisingEdge(dut.clk)
yield ReadOnly()
yield ReadWrite()
exited = True
self.log.debug(
"ARADDR %d\n" % _araddr +
"ARLEN %d\n" % _arlen +
"ARSIZE %d\n" % _arsize +
"ARBURST %d\n" % _arburst +
"ARPROT %d\n" % _arprot +
"BURST_LENGTH %d\n" % burst_length +
"Bytes in beat %d\n" % bytes_in_beat)
burst_count = burst_length
yield clock_re
while True:
self.bus.RVALID <= 1
yield ReadOnly()
if self.bus.RREADY.value:
_burst_diff = burst_length - burst_count
_st = _araddr + (_burst_diff * bytes_in_beat)
_end = _araddr + ((_burst_diff + 1) * bytes_in_beat)
word.buff = self._memory[_st:_end].tobytes()
self.bus.RDATA <= word
if burst_count == 1:
self.bus.RLAST <= 1
yield clock_re
burst_count -= 1
self.bus.RLAST <= 0
if burst_count == 0:
break
def _respond(self):
"""Coroutine to respond to the actual requests."""
edge = RisingEdge(self.clock)
while True:
yield edge
self._do_response()
yield ReadOnly()
if self._readable and self.bus.read.value:
if not self._burstread:
self._pad()
addr = self.bus.address.value.integer
if addr not in self._mem:
self.log.warning("Attempt to read from uninitialized "
"address 0x%x", addr)
self._responses.append(True)
else:
self.log.debug("Read from address 0x%x returning 0x%x",
addr, self._mem[addr])
self._responses.append(self._mem[addr])
else:
addr = self.bus.address.value.integer
if addr % self.dataByteSize != 0:
Unless a coroutine has explicitly requested to be scheduled in ReadOnly
mode (for example wanting to sample the finally settled value after all
delta delays) then it can reasonably be expected to be scheduled during
"normal mode" i.e. where writes are permitted.
"""
_MODE_NORMAL = 1 # noqa
_MODE_READONLY = 2 # noqa
_MODE_WRITE = 3 # noqa
_MODE_TERM = 4 # noqa
# Singleton events, recycled to avoid spurious object creation
_next_time_step = NextTimeStep()
_read_write = ReadWrite()
_read_only = ReadOnly()
_timer1 = Timer(1)
def __init__(self):
self.log = SimLog("cocotb.scheduler")
if _debug:
self.log.setLevel(logging.DEBUG)
# Use OrderedDict here for deterministic behavior (gh-934)
# A dictionary of pending coroutines for each trigger,
# indexed by trigger
self._trigger2coros = _py_compat.insertion_ordered_dict()
# A dictionary mapping coroutines to the trigger they are waiting for
self._coro2trigger = _py_compat.insertion_ordered_dict()
def ping(self):
timeout_count = 0
while timeout_count < self.timeout:
yield RisingEdge(self.dut.clk)
timeout_count += 1
yield ReadOnly()
if self.master_ready.value.get_value() == 0:
continue
else:
break
if timeout_count == self.timeout:
cocotb.log.error("Timed out while waiting for master to be ready")
return
yield ReadWrite()
self.dut.in_ready <= 1
self.dut.in_command <= 0
self.dut.in_data <= 0
self.dut.in_address <= 0
self.dut.in_data_count <= 0
self.dut.out_ready <= 1
def _wait_ready(self):
"""Wait for a ready cycle on the bus before continuing.
Can no longer drive values this cycle...
FIXME assumes readyLatency of 0
"""
yield ReadOnly()
while not self.bus.ready.value:
yield RisingEdge(self.clock)
yield ReadOnly()
OPBException: If read took longer than 16 cycles.
"""
yield self._acquire_lock()
# Apply values for next clock edge
if sync:
yield RisingEdge(self.clock)
self.bus.ABus <= address
self.bus.select <= 1
self.bus.RNW <= 1
self.bus.BE <= 0xF
count = 0
while not int(self.bus.xferAck.value):
yield RisingEdge(self.clock)
yield ReadOnly()
if int(self.bus.toutSup.value):
count = 0
else:
count += 1
if count >= self._max_cycles:
raise OPBException("Read took longer than 16 cycles")
data = int(self.bus.DBus_out.value)
# Deassert read
self.bus.select <= 0
self._release_lock()
self.log.info("Read of address 0x%x returned 0x%08x" % (address, data))
return data
OPBException: If write took longer than 16 cycles.
"""
yield self._acquire_lock()
if sync:
yield RisingEdge(self.clock)
self.bus.ABus <= address
self.bus.select <= 1
self.bus.RNW <= 0
self.bus.BE <= 0xF
self.bus.DBus_out <= value
count = 0
while not int(self.bus.xferAck.value):
yield RisingEdge(self.clock)
yield ReadOnly()
if int(self.bus.toutSup.value):
count = 0
else:
count += 1
if count >= self._max_cycles:
raise OPBException("Write took longer than 16 cycles")
self.bus.select <= 0
self._release_lock()