How to use the cocotb.triggers.ReadOnly function in cocotb

To help you get started, we’ve selected a few cocotb examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github cocotb / cocotb / tests / test_cases / issue_768_a / issue_768.py View on Github external
def test(dut):
    dut.stream_in_data.setimmediatevalue(value)
    yield Timer(1)
    assert dut.stream_in_data.value == 0
    yield ReadOnly()
github cocotb / cocotb / tests / test_cases / test_cocotb / test_timing_triggers.py View on Github external
def do_test_readwrite_in_readonly(dut):
    global exited
    yield RisingEdge(dut.clk)
    yield ReadOnly()
    yield ReadWrite()
    exited = True
github cocotb / cocotb / cocotb / drivers / amba.py View on Github external
self.log.debug(
                    "ARADDR  %d\n" % _araddr +
                    "ARLEN   %d\n" % _arlen +
                    "ARSIZE  %d\n" % _arsize +
                    "ARBURST %d\n" % _arburst +
                    "ARPROT %d\n" % _arprot +
                    "BURST_LENGTH %d\n" % burst_length +
                    "Bytes in beat %d\n" % bytes_in_beat)

            burst_count = burst_length

            yield clock_re

            while True:
                self.bus.RVALID <= 1
                yield ReadOnly()
                if self.bus.RREADY.value:
                    _burst_diff = burst_length - burst_count
                    _st = _araddr + (_burst_diff * bytes_in_beat)
                    _end = _araddr + ((_burst_diff + 1) * bytes_in_beat)
                    word.buff = self._memory[_st:_end].tobytes()
                    self.bus.RDATA <= word
                    if burst_count == 1:
                        self.bus.RLAST <= 1
                yield clock_re
                burst_count -= 1
                self.bus.RLAST <= 0
                if burst_count == 0:
                    break
github cocotb / cocotb / cocotb / drivers / avalon.py View on Github external
def _respond(self):
        """Coroutine to respond to the actual requests."""
        edge = RisingEdge(self.clock)
        while True:
            yield edge
            self._do_response()

            yield ReadOnly()

            if self._readable and self.bus.read.value:
                if not self._burstread:
                    self._pad()
                    addr = self.bus.address.value.integer
                    if addr not in self._mem:
                        self.log.warning("Attempt to read from uninitialized "
                                         "address 0x%x", addr)
                        self._responses.append(True)
                    else:
                        self.log.debug("Read from address 0x%x returning 0x%x",
                                       addr, self._mem[addr])
                        self._responses.append(self._mem[addr])
                else:
                    addr = self.bus.address.value.integer
                    if addr % self.dataByteSize != 0:
github cocotb / cocotb / cocotb / scheduler.py View on Github external
Unless a coroutine has explicitly requested to be scheduled in ReadOnly
    mode (for example wanting to sample the finally settled value after all
    delta delays) then it can reasonably be expected to be scheduled during
    "normal mode" i.e. where writes are permitted.
    """

    _MODE_NORMAL   = 1  # noqa
    _MODE_READONLY = 2  # noqa
    _MODE_WRITE    = 3  # noqa
    _MODE_TERM     = 4  # noqa

    # Singleton events, recycled to avoid spurious object creation
    _next_time_step = NextTimeStep()
    _read_write = ReadWrite()
    _read_only = ReadOnly()
    _timer1 = Timer(1)

    def __init__(self):

        self.log = SimLog("cocotb.scheduler")
        if _debug:
            self.log.setLevel(logging.DEBUG)

        # Use OrderedDict here for deterministic behavior (gh-934)

        # A dictionary of pending coroutines for each trigger,
        # indexed by trigger
        self._trigger2coros = _py_compat.insertion_ordered_dict()

        # A dictionary mapping coroutines to the trigger they are waiting for
        self._coro2trigger = _py_compat.insertion_ordered_dict()
github CospanDesign / nysa / nysa / host / sim / sim_host.py View on Github external
def ping(self):
        timeout_count       =  0

        while timeout_count < self.timeout:
            yield RisingEdge(self.dut.clk)
            timeout_count   += 1
            yield ReadOnly()
            if self.master_ready.value.get_value() == 0:
                continue
            else:
                break

        if timeout_count == self.timeout:
            cocotb.log.error("Timed out while waiting for master to be ready")
            return

        yield ReadWrite()
        self.dut.in_ready       <=  1
        self.dut.in_command     <=  0
        self.dut.in_data        <=  0
        self.dut.in_address     <=  0
        self.dut.in_data_count  <=  0
        self.dut.out_ready      <=  1
github cocotb / cocotb / cocotb / drivers / avalon.py View on Github external
def _wait_ready(self):
        """Wait for a ready cycle on the bus before continuing.

            Can no longer drive values this cycle...

            FIXME assumes readyLatency of 0
        """
        yield ReadOnly()
        while not self.bus.ready.value:
            yield RisingEdge(self.clock)
            yield ReadOnly()
github cocotb / cocotb / cocotb / drivers / opb.py View on Github external
OPBException: If read took longer than 16 cycles.
        """
        yield self._acquire_lock()

        # Apply values for next clock edge
        if sync:
            yield RisingEdge(self.clock)
        self.bus.ABus <= address
        self.bus.select <= 1
        self.bus.RNW <= 1
        self.bus.BE <= 0xF

        count = 0
        while not int(self.bus.xferAck.value):
            yield RisingEdge(self.clock)
            yield ReadOnly()
            if int(self.bus.toutSup.value):
                count = 0
            else:
                count += 1
            if count >= self._max_cycles:
                raise OPBException("Read took longer than 16 cycles")
        data = int(self.bus.DBus_out.value)

        # Deassert read
        self.bus.select <= 0
        self._release_lock()
        self.log.info("Read of address 0x%x returned 0x%08x" % (address, data))
        return data
github cocotb / cocotb / cocotb / drivers / opb.py View on Github external
OPBException: If write took longer than 16 cycles.
        """
        yield self._acquire_lock()

        if sync:
            yield RisingEdge(self.clock)
        self.bus.ABus <= address
        self.bus.select <= 1
        self.bus.RNW <= 0
        self.bus.BE <= 0xF
        self.bus.DBus_out <= value

        count = 0
        while not int(self.bus.xferAck.value):
            yield RisingEdge(self.clock)
            yield ReadOnly()
            if int(self.bus.toutSup.value):
                count = 0
            else:
                count += 1
            if count >= self._max_cycles:
                raise OPBException("Write took longer than 16 cycles")

        self.bus.select <= 0
        self._release_lock()