Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""The U-Boot Module contains the UBootDriver"""
import logging
import re
import attr
from pexpect import TIMEOUT
from ..factory import target_factory
from ..protocol import CommandProtocol, ConsoleProtocol, LinuxBootProtocol
from ..util import gen_marker
from ..step import step
from .common import Driver
from .commandmixin import CommandMixin
@target_factory.reg_driver
@attr.s(eq=False)
class UBootDriver(CommandMixin, Driver, CommandProtocol, LinuxBootProtocol):
"""UBootDriver - Driver to control uboot via the console.
UBootDriver binds on top of a ConsoleProtocol.
Args:
prompt (str): The default UBoot Prompt
password (str): optional password to unlock UBoot
init_commands (Tuple[str]): a tuple of commands to run after unlock
interrupt(str): interrupt character to use to go to prompt
password_prompt (str): string to detect the password prompt
boot_expression (str): string to search for on UBoot start
bootstring (str): string that indicates that the Kernel is booting
boot_command (str): optional boot command to boot target
login_timeout (int): optional, timeout for login prompt detection
# pylint: disable=no-member
import attr
from ..factory import target_factory
from ..step import step
from .common import Driver
from ..util.managedfile import ManagedFile
from ..util.helper import processwrapper
@target_factory.reg_driver
@attr.s(eq=False)
class AndroidFastbootDriver(Driver):
bindings = {
"fastboot": {"AndroidFastboot", "NetworkAndroidFastboot"},
}
image = attr.ib(default=None)
sparse_size = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str))
)
def __attrs_post_init__(self):
super().__attrs_post_init__()
# FIXME make sure we always have an environment or config
if self.target.env:
# pylint: disable=no-member
import subprocess
import attr
from .common import Driver
from ..factory import target_factory
from ..exceptions import InvalidConfigError
@target_factory.reg_driver
@attr.s(eq=False)
class USBVideoDriver(Driver):
bindings = {
"video": {"USBVideo", "NetworkUSBVideo"},
}
def get_caps(self):
match = (self.video.vendor_id, self.video.model_id)
if match == (0x046d, 0x082d):
return ("mid", [
("low", "video/x-h264,width=640,height=360,framerate=5/1"),
("mid", "video/x-h264,width=1280,height=720,framerate=15/2"),
("high", "video/x-h264,width=1920,height=1080,framerate=10/1"),
])
if match == (0x046d, 0x0892):
return ("mid", [
# pylint: disable=no-member
from importlib import import_module
from decimal import Decimal
import attr
from .common import Driver
from ..factory import target_factory
from ..exceptions import InvalidConfigError
from ..util.agentwrapper import AgentWrapper, b2s, s2b
@target_factory.reg_driver
@attr.s(eq=False)
class USBTMCDriver(Driver):
bindings = {
"tmc": {"USBTMC", "NetworkUSBTMC"},
}
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None
self.backend = None
assert self.tmc.path.startswith('/dev/usbtmc')
self.index = int(self.tmc.path[11:], 10)
assert self.tmc.path == '/dev/usbtmc'+str(self.index)
def on_activate(self):
from importlib import import_module
import attr
from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..util.proxy import proxymanager
from .common import Driver
from .exception import ExecutionError
@target_factory.reg_driver
@attr.s(eq=False)
class ModbusCoilDriver(Driver, DigitalOutputProtocol):
bindings = {"coil": "ModbusTCPCoil", }
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._module = import_module('pyModbusTCP.client')
self.client = None
def on_activate(self):
# we can only forward if the backend knows which port to use
host, port = proxymanager.get_host_and_port(self.coil, default_port=502)
self.client = self._module.ModbusClient(
host=host, port=int(port), auto_open=True, auto_close=True)
def on_deactivate(self):
# pylint: disable=no-member
import subprocess
import re
import attr
from ..factory import target_factory
from ..step import step
from .common import Driver
from .exception import ExecutionError
from ..util.helper import processwrapper
from ..util.managedfile import ManagedFile
@target_factory.reg_driver
@attr.s(eq=False)
class QuartusHPSDriver(Driver):
bindings = {
"interface": {"AlteraUSBBlaster", "NetworkAlteraUSBBlaster"},
}
image = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str))
)
def __attrs_post_init__(self):
super().__attrs_post_init__()
# FIXME make sure we always have an environment or config
if self.target.env:
self.tool = self.target.env.config.get_tool('quartus_hps') or 'quartus_hps'
"""
Class for connecting to a docker daemon running on the host machine.
"""
import logging
import attr
from labgrid.factory import target_factory
from labgrid.driver.common import Driver
from labgrid.resource.docker import DockerConstants
from labgrid.protocol.powerprotocol import PowerProtocol
@target_factory.reg_driver
@attr.s(eq=False)
class DockerDriver(PowerProtocol, Driver):
"""The DockerDriver is used to create docker containers.
This is done via communication with a docker daemon.
When a container is created the container is labeled with an
cleanup strategy identifier. Currently only one strategy is
implemented. This strategy simply deletes all labgrid created
containers before each test run. This is to ensure cleanup of
dangling containers from crashed tests or hanging containers.
Image pruning is not done by the driver.
For detailed information about the arguments see the
"Docker SDK for Python" documentation
https://docker-py.readthedocs.io/en/stable/containers.html#container-objects
from labgrid.step import step
from labgrid.strategy import Strategy
@attr.s(eq=False)
class StrategyError(Exception):
msg = attr.ib(validator=attr.validators.instance_of(str))
class Status(enum.Enum):
unknown = 0
barebox = 1
shell = 2
@target_factory.reg_driver
@attr.s(eq=False)
class ExampleStrategy(Strategy):
"""ExampleStrategy - Strategy to for the usbpower labgrid example"""
bindings = {
"power": PowerProtocol,
"sdmux": USBSDMuxDriver,
"barebox": BareboxDriver,
"shell": ShellDriver,
}
status = attr.ib(default=Status.unknown)
def __attrs_post_init__(self):
super().__attrs_post_init__()
@step(args=['status'])
from labgrid.step import step
from labgrid.strategy.common import Strategy
@attr.s(eq=False)
class StrategyError(Exception):
msg = attr.ib(validator=attr.validators.instance_of(str))
class Status(enum.Enum):
unknown = 0
flashed_xload = 1
flashed = 2
@target_factory.reg_driver
@attr.s(eq=False)
class QuartusHPSStrategy(Strategy):
"""QuartusHPSStrategy - Strategy to flash QSPI via 'Quartus Prime Programmer and Tools'"""
bindings = {
"power": PowerProtocol,
"quartushps": QuartusHPSDriver,
"serial": SerialDriver,
}
image = attr.ib(validator=attr.validators.instance_of(str))
image_xload = attr.ib(validator=attr.validators.instance_of(str))
status = attr.ib(default=Status.unknown)
def __attrs_post_init__(self):
super().__attrs_post_init__()
"""All GPIO-related drivers"""
import attr
from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..resource.remote import NetworkSysfsGPIO
from ..step import step
from .common import Driver
from ..util.agentwrapper import AgentWrapper
@target_factory.reg_driver
@attr.s(eq=False)
class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol):
bindings = {
"gpio": {"SysfsGPIO", "NetworkSysfsGPIO"},
}
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None
def on_activate(self):
if isinstance(self.gpio, NetworkSysfsGPIO):
host = self.gpio.host
else:
host = None