Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def off(self):
cmd = shlex.split(self.cmd_off)
processwrapper.check_output(cmd)
@Driver.check_active
@step()
def cycle(self):
if self.cmd_cycle is not None:
cmd = shlex.split(self.cmd_cycle)
processwrapper.check_output(cmd)
else:
self.off()
time.sleep(self.delay)
self.on()
@target_factory.reg_driver
@attr.s(eq=False)
class NetworkPowerDriver(Driver, PowerResetMixin, PowerProtocol):
"""NetworkPowerDriver - Driver using a networked power switch to control a target's power"""
bindings = {"port": NetworkPowerPort, }
delay = attr.ib(default=2.0, validator=attr.validators.instance_of(float))
def __attrs_post_init__(self):
super().__attrs_post_init__()
# TODO: allow backends to register models with other names
self.backend = import_module(
".power.{}".format(self.port.model),
__package__
)
self._host = None
self._port = None
import logging
import re
import shlex
import attr
from pexpect import TIMEOUT
from ..factory import target_factory
from ..protocol import CommandProtocol, ConsoleProtocol, LinuxBootProtocol
from ..step import step
from ..util import gen_marker, Timeout
from .common import Driver
from .commandmixin import CommandMixin
@target_factory.reg_driver
@attr.s(eq=False)
class BareboxDriver(CommandMixin, Driver, CommandProtocol, LinuxBootProtocol):
"""BareboxDriver - Driver to control barebox via the console.
BareboxDriver binds on top of a ConsoleProtocol.
Args:
prompt (str): The default Barebox Prompt
bootstring (str): string that indicates that the Kernel is booting
password (str): optional, password to use for access to the shell
login_timeout (int): optional, timeout for access to the shell
"""
bindings = {"console": ConsoleProtocol, }
prompt = attr.ib(default="", validator=attr.validators.instance_of(str))
autoboot = attr.ib(default="stop autoboot", validator=attr.validators.instance_of(str))
interrupt = attr.ib(default="\n", validator=attr.validators.instance_of(str))
bootstring = attr.ib(default=r"Linux version \d", validator=attr.validators.instance_of(str))
ifname (str): name of the interface"""
ifname = attr.ib(default=None)
@target_factory.reg_resource
@attr.s
class EthernetPort(Resource):
"""The basic EthernetPort describes a switch and interface
Args:
switch (str): name of the switch
interface (str): name of the interface"""
switch = attr.ib(default=None)
interface = attr.ib(default=None)
@target_factory.reg_resource
@attr.s(eq=False)
class SysfsGPIO(Resource):
"""The basic SysfsGPIO contains an index
Args:
index (int): index of target gpio line."""
index = attr.ib(default=None, validator=attr.validators.instance_of(int))
@step(title='call', args=['args'])
def _call(self, *args):
combined = self.sigrok.command_prefix + [self.tool]
if self.sigrok.channels:
combined += ["-C", self.sigrok.channels]
combined += list(args)
self.log.debug("Combined command: %s", combined)
self._process = subprocess.Popen(
combined,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE
)
@target_factory.reg_driver
@attr.s(eq=False)
class SigrokDriver(SigrokCommon):
"""The SigrokDriver uses sigrok-cli to record samples and expose them as python dictionaries.
Args:
bindings (dict): driver to use with sigrok
"""
bindings = {
"sigrok": {SigrokUSBDevice, NetworkSigrokUSBDevice, SigrokDevice},
}
@Driver.check_active
def capture(self, filename, samplerate="200k"):
self._filename = filename
self._basename = os.path.basename(self._filename)
self.log.debug(
# pylint: disable=no-member
import logging
import os
from time import time
import attr
from ..factory import target_factory
from ..step import step
from .common import Driver
@target_factory.reg_driver
@attr.s(eq=False)
class USBStorageDriver(Driver):
bindings = {"storage": {"USBMassStorage", "USBSDMuxDevice"}, }
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.logger = logging.getLogger("{}:{}".format(self, self.target))
def on_activate(self):
pass
def on_deactivate(self):
pass
@step(args=['filename'])
def write_image(self, filename):
for k, v_new in attrs.items():
# check for attr converters
attrib = getattr(fields, k)
if attrib.converter:
v_new = attrib.converter(v_new)
v_old = getattr(resource, k)
setattr(resource, k, v_new)
if v_old != v_new:
changes.append((k, v_old, v_new))
if changes:
self.logger.debug("changed attributes for %s:", resource)
for k, v_old, v_new in changes:
self.logger.debug(" %s: %s -> %s", k, v_old, v_new)
@target_factory.reg_resource
@attr.s(eq=False)
class RemotePlace(ManagedResource):
manager_cls = RemotePlaceManager
def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()
@attr.s(eq=False)
class RemoteUSBResource(NetworkResource, ManagedResource):
manager_cls = RemotePlaceManager
busnum = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(int)))
devnum = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(int)))
path = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(str)))
vendor_id = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(int)))
# pylint: disable=no-member
import os.path
import logging
import attr
from ..resource import NetworkFlashrom
from ..factory import target_factory
from ..step import step
from ..protocol import BootstrapProtocol
from .common import Driver, check_file
from ..util.managedfile import ManagedFile
from ..util.helper import processwrapper
@target_factory.reg_driver
@attr.s(eq=False)
class FlashromDriver(Driver, BootstrapProtocol):
""" The Flashrom driver used the flashrom utility to write an image to a raw rom.
The driver is a pure wrapper of the flashrom utility"""
bindings = {
'flashrom_resource': {"Flashrom", NetworkFlashrom},
}
image = attr.ib(default=None)
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.logger = logging.getLogger('{}'.format(self))
if self.target.env:
self.tool = self.target.env.config.get_tool('flashrom')
else:
if not switch:
resource.extra = None
continue
extra = {}
for mac, timestamp in switch.macs_by_port.get(resource.interface, {}).items():
extra.setdefault('macs', {})[mac] = {
'timestamp': timestamp,
'ips': self.neighbors.get(mac, []),
}
extra.update(switch.ports.get(resource.interface, {}))
if resource.extra != extra:
resource.extra = extra
self.logger.debug("new information for %s: %s", resource, extra)
@target_factory.reg_resource
@attr.s
class SNMPEthernetPort(ManagedResource):
"""SNMPEthernetPort describes an ethernet port which can be queried over
SNMP.
Args:
switch (str): hostname of the switch to query
interface (str): name of the interface to query
"""
manager_cls = EthernetPortManager
switch = attr.ib(validator=attr.validators.instance_of(str))
interface = attr.ib(validator=attr.validators.instance_of(str))
def __attrs_post_init__(self):
super().__attrs_post_init__()
import attr
from ..driver import ShellDriver
from ..factory import target_factory
from ..protocol import PowerProtocol
from ..step import step
from .common import Strategy, StrategyError
class Status(enum.Enum):
unknown = 0
off = 1
shell = 2
@target_factory.reg_driver
@attr.s(eq=False)
class ShellStrategy(Strategy):
"""ShellStrategy - Strategy to switch to shell"""
bindings = {
"power": PowerProtocol,
"shell": ShellDriver,
}
status = attr.ib(default=Status.unknown)
def __attrs_post_init__(self):
super().__attrs_post_init__()
@step(args=['status'])
def transition(self, status, *, step):
if not isinstance(status, Status):