Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
processwrapper.check_output(cmd)
@Driver.check_active
@step()
def cycle(self):
if self.cmd_cycle is not None:
cmd = shlex.split(self.cmd_cycle)
processwrapper.check_output(cmd)
else:
self.off()
time.sleep(self.delay)
self.on()
@target_factory.reg_driver
@attr.s(eq=False)
class NetworkPowerDriver(Driver, PowerResetMixin, PowerProtocol):
"""NetworkPowerDriver - Driver using a networked power switch to control a target's power"""
bindings = {"port": NetworkPowerPort, }
delay = attr.ib(default=2.0, validator=attr.validators.instance_of(float))
def __attrs_post_init__(self):
super().__attrs_post_init__()
# TODO: allow backends to register models with other names
self.backend = import_module(
".power.{}".format(self.port.model),
__package__
)
self._host = None
self._port = None
def on_activate(self):
if not line or not line.startswith(b' '):
continue
prefix, status = line.strip().split(b':', 1)
if not prefix == b"Port %d" % self.hub.index:
continue
status = status.split()
if b"power" in status:
return True
if b"off" in status:
return False
raise ExecutionError("Did not find port status in uhubctl output ({})".format(repr(output)))
@target_factory.reg_driver
@attr.s(eq=False)
class PDUDaemonDriver(Driver, PowerResetMixin, PowerProtocol):
"""PDUDaemonDriver - Driver using a PDU port available via pdudaemon"""
bindings = {"port": "PDUDaemonPort", }
delay = attr.ib(default=5, validator=attr.validators.instance_of(int))
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._requests = import_module('requests')
self._host = None
self._port = None
def _build_url(self, cmd):
res = "http://{}:{}/power/control/{}?hostname={}&port={}".format(
self._host, self._port, cmd, self.port.pdu, self.port.index)
if cmd == 'reboot':
res += "&delay={}".format(self.delay)
return res
args = args.split(" ")
args.insert(0, '-i')
args.append("--protocol-decoder-samplenum")
args.append("-l")
args.append("4")
combined = self._get_sigrok_prefix() + args
output = subprocess.check_output(combined)
return [
match.groupdict()
for match in re.finditer(annotation_regex, output.decode("utf-8"))
]
@target_factory.reg_driver
@attr.s(eq=False)
class SigrokPowerDriver(SigrokCommon, PowerResetMixin, PowerProtocol):
"""The SigrokPowerDriverDriver uses sigrok-cli to control a PSU and collect
measurements.
Args:
bindings (dict): driver to use with sigrok
"""
bindings = {
"sigrok": {SigrokUSBSerialDevice, NetworkSigrokUSBSerialDevice},
}
delay = attr.ib(default=3.0, validator=attr.validators.instance_of(float))
max_voltage = attr.ib(
default=None,
converter=attr.converters.optional(float),
validator=attr.validators.optional(attr.validators.instance_of(float)),
)
max_current = attr.ib(
self.target.interact(
"Turn the target {name} OFF and press enter".
format(name=self.name)
)
@Driver.check_active
@step()
def cycle(self):
self.target.interact(
"CYCLE the target {name} and press enter".format(name=self.name)
)
@target_factory.reg_driver
@attr.s(eq=False)
class ExternalPowerDriver(Driver, PowerResetMixin, PowerProtocol):
"""ExternalPowerDriver - Driver using an external command to control a target's power"""
cmd_on = attr.ib(validator=attr.validators.instance_of(str))
cmd_off = attr.ib(validator=attr.validators.instance_of(str))
cmd_cycle = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str))
)
delay = attr.ib(default=2.0, validator=attr.validators.instance_of(float))
@Driver.check_active
@step()
def on(self):
cmd = shlex.split(self.cmd_on)
processwrapper.check_output(cmd)
@Driver.check_active
self.backend.power_set(self._host, self._port, self.port.index, False)
@Driver.check_active
@step()
def cycle(self):
self.off()
time.sleep(self.delay)
self.on()
@Driver.check_active
def get(self):
return self.backend.power_get(self._host, self._port, self.port.index)
@target_factory.reg_driver
@attr.s(eq=False)
class DigitalOutputPowerDriver(Driver, PowerResetMixin, PowerProtocol):
"""
DigitalOutputPowerDriver uses a DigitalOutput to control the power
of a DUT.
"""
bindings = {"output": DigitalOutputProtocol, }
delay = attr.ib(default=1.0, validator=attr.validators.instance_of(float))
def __attrs_post_init__(self):
super().__attrs_post_init__()
@Driver.check_active
@step()
def on(self):
self.output.set(True)
@Driver.check_active
@target_factory.reg_driver
@attr.s(eq=False)
class FakeFileTransferDriver(Driver, FileTransferProtocol):
@Driver.check_active
def get(self, *args):
pass
@Driver.check_active
def put(self, *args):
pass
@target_factory.reg_driver
@attr.s(eq=False)
class FakePowerDriver(Driver, PowerProtocol):
@Driver.check_active
def on(self, *args):
pass
@Driver.check_active
def off(self, *args):
pass
@Driver.check_active
def cycle(self, *args):
pass
class StrategyError(Exception):
msg = attr.ib(validator=attr.validators.instance_of(str))
class Status(enum.Enum):
unknown = 0
flashed_xload = 1
flashed = 2
@target_factory.reg_driver
@attr.s(eq=False)
class QuartusHPSStrategy(Strategy):
"""QuartusHPSStrategy - Strategy to flash QSPI via 'Quartus Prime Programmer and Tools'"""
bindings = {
"power": PowerProtocol,
"quartushps": QuartusHPSDriver,
"serial": SerialDriver,
}
image = attr.ib(validator=attr.validators.instance_of(str))
image_xload = attr.ib(validator=attr.validators.instance_of(str))
status = attr.ib(default=Status.unknown)
def __attrs_post_init__(self):
super().__attrs_post_init__()
@step(args=['status'])
def transition(self, status, *, step):
if not isinstance(status, Status):
status = Status[status]
if status == Status.unknown:
from ..step import step
from .common import Strategy, StrategyError
class Status(enum.Enum):
unknown = 0
off = 1
shell = 2
@target_factory.reg_driver
@attr.s(eq=False)
class ShellStrategy(Strategy):
"""ShellStrategy - Strategy to switch to shell"""
bindings = {
"power": PowerProtocol,
"shell": ShellDriver,
}
status = attr.ib(default=Status.unknown)
def __attrs_post_init__(self):
super().__attrs_post_init__()
@step(args=['status'])
def transition(self, status, *, step):
if not isinstance(status, Status):
status = Status[status]
if status == Status.unknown:
raise StrategyError("can not transition to {}".format(status))
elif status == self.status:
step.skip("nothing to do")
self.pykush.set_port_state(self.port.index, self.pykush_mod.YKUSH_PORT_STATE_DOWN)
@Driver.check_active
@step()
def cycle(self):
self.off()
time.sleep(self.delay)
self.on()
@Driver.check_active
def get(self):
return self.pykush.get_port_state(self.port.index)
@target_factory.reg_driver
@attr.s(eq=False)
class USBPowerDriver(Driver, PowerResetMixin, PowerProtocol):
"""USBPowerDriver - Driver using a power switchable USB hub and the uhubctl
tool (https://github.com/mvp/uhubctl) to control a target's power"""
bindings = {"hub": {"USBPowerPort", "NetworkUSBPowerPort"}, }
delay = attr.ib(default=2.0, validator=attr.validators.instance_of(float))
def __attrs_post_init__(self):
super().__attrs_post_init__()
if self.target.env:
self.tool = self.target.env.config.get_tool('uhubctl') or 'uhubctl'
else:
self.tool = 'uhubctl'
def _switch(self, cmd):
cmd = self.hub.command_prefix + [
self.tool,
from .common import Strategy, StrategyError
class Status(enum.Enum):
unknown = 0
off = 1
barebox = 2
shell = 3
@target_factory.reg_driver
@attr.s(eq=False)
class BareboxStrategy(Strategy):
"""BareboxStrategy - Strategy to switch to barebox or shell"""
bindings = {
"power": PowerProtocol,
"barebox": BareboxDriver,
"shell": ShellDriver,
}
status = attr.ib(default=Status.unknown)
def __attrs_post_init__(self):
super().__attrs_post_init__()
@step(args=['status'])
def transition(self, status, *, step):
if not isinstance(status, Status):
status = Status[status]
if status == Status.unknown:
raise StrategyError("can not transition to {}".format(status))
elif status == self.status: