Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@cocotb.coroutine
def transaction_data_out(self, addr, ep, data, chunk_size=64, expected=PID.ACK, datax=PID.DATA1):
epnum = EndpointType.epnum(ep)
for _i, chunk in enumerate(grouper_tofit(chunk_size, data)):
self.dut._log.warning("sending {} bytes to host on endpoint {}".format(len(chunk), epnum))
# Enable receiving data
yield self.set_response(ep, EndpointResponse.ACK)
xmit = cocotb.fork(self.host_send(datax, addr, ep, chunk, expected))
yield self.expect_data(epnum, list(chunk), expected)
yield xmit.join()
if datax == PID.DATA0:
datax = PID.DATA1
else:
datax = PID.DATA0
@cocotb.coroutine
async def async_annotated(outcome):
await Timer(1)
return outcome.get()
@cocotb.coroutine
def host_expect_stall(self):
yield self.host_expect_packet(handshake_packet(PID.STALL), "Expected STALL packet.")
@cocotb.coroutine
def signal_mon(self, signal, idx, edge):
while True:
yield edge(signal)
self.monitor_edges[idx] += 1
@cocotb.coroutine
def control_transfer_out(self, addr, setup_data, descriptor_data=None):
epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
if (setup_data[0] & 0x80) == 0x80:
raise Exception("setup_data indicated an IN transfer, but you requested an OUT transfer")
setup_ev = yield self.read(self.csrs['usb_setup_ev_pending'])
if setup_ev != 0:
raise TestFailure("setup_ev should be 0 at the start of the test, was: {:02x}".format(setup_ev))
# Setup stage
self.dut._log.info("setup stage")
yield self.transaction_setup(addr, setup_data)
setup_ev = yield self.read(self.csrs['usb_setup_ev_pending'])
def __init__(self, test_function, *args, **kwargs):
if sys.version_info > (3, 6) and inspect.isasyncgenfunction(test_function):
raise TypeError("Expected a coroutine function, but got the async generator '{}'. "
"Did you forget to convert a `yield` to an `await`?"
.format(test_function.__qualname__))
if not (isinstance(test_function, cocotb.coroutine) or inspect.iscoroutinefunction(test_function)):
raise TypeError("TestFactory requires a cocotb coroutine")
self.test_function = test_function
self.name = self.test_function.__qualname__
self.args = args
self.kwargs_constant = kwargs
self.kwargs = {}
self.log = _logger
@cocotb.coroutine
def _cr_twiddler(self, generator=None):
if generator is None and self._generator is None:
raise Exception("No generator provided!")
if generator is not None:
self._generator = generator
edge = RisingEdge(self._clk)
# Actual thread
while True:
on, off = next(self._generator)
self._signal <= 1
for _ in range(on):
yield edge
self._signal <= 0
for _ in range(off):
@cocotb.coroutine
def reset(self):
"""reset
Software reset the Nysa FPGA Master, this may not actually reset the
entire FPGA image
Args:
Nothing
Returns:
Nothing
Raises:
NysaCommError: A failure of communication is detected
"""
yield(self.comm_lock.acquire())
@cocotb.coroutine
def reset(self):
self.dut.log.info("Sending Reset to the bus")
yield Timer (0)
self.rst <= 1
yield ClockCycles(self.dut.clk, self.reset_length)
self.dut.log.info("Reset high")
#self.rst <= 0
self.in_ready <= 0
self.in_reset <= 0
self.out_ready <= 0
self.in_command <= 0
self.in_address <= 0
self.in_data <= 0
self.in_data_count <= 0
@cocotb.coroutine
def dump_core(self):
pass