Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@Driver.check_active
@step()
def set(self, status):
out = self.true_repr if status else self.false_repr
with open(self.filepath, "w") as fdes:
fdes.write(out)
@Driver.check_active
@step(args=['filename'])
def load(self, filename=None):
if filename is None and self.image is not None:
filename = self.target.env.config.get_image_path(self.image)
mf = ManagedFile(filename, self.interface)
mf.sync_to_resource()
managed_configs = []
for config in self.config:
mconfig = ManagedFile(config, self.interface)
mconfig.sync_to_resource()
managed_configs.append(mconfig)
cmd = self.interface.command_prefix+[self.tool]
cmd += chain.from_iterable(("--search", path) for path in self.search)
@Driver.check_active
@step()
def set(self, value):
if self.signal == "dtr":
self._p.dtr = value
elif self.signal == "rts":
self._p.rts = value
# pylint: disable=no-member
import subprocess
import re
import attr
from ..factory import target_factory
from ..step import step
from .common import Driver
from .exception import ExecutionError
from ..util.helper import processwrapper
from ..util.managedfile import ManagedFile
@target_factory.reg_driver
@attr.s(eq=False)
class QuartusHPSDriver(Driver):
bindings = {
"interface": {"AlteraUSBBlaster", "NetworkAlteraUSBBlaster"},
}
image = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str))
)
def __attrs_post_init__(self):
super().__attrs_post_init__()
# FIXME make sure we always have an environment or config
if self.target.env:
self.tool = self.target.env.config.get_tool('quartus_hps') or 'quartus_hps'
else:
self.tool = 'quartus_hps'
from importlib import import_module
import attr
from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..util.proxy import proxymanager
from .common import Driver
from .exception import ExecutionError
@target_factory.reg_driver
@attr.s(eq=False)
class ModbusCoilDriver(Driver, DigitalOutputProtocol):
bindings = {"coil": "ModbusTCPCoil", }
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._module = import_module('pyModbusTCP.client')
self.client = None
def on_activate(self):
# we can only forward if the backend knows which port to use
host, port = proxymanager.get_host_and_port(self.coil, default_port=502)
self.client = self._module.ModbusClient(
host=host, port=int(port), auto_open=True, auto_close=True)
def on_deactivate(self):
self.client = None
import subprocess
import tempfile
import attr
from ..factory import target_factory
from ..protocol import CommandProtocol, FileTransferProtocol
from .commandmixin import CommandMixin
from .common import Driver
from ..step import step
from .exception import ExecutionError
@target_factory.reg_driver
@attr.s(eq=False)
class SSHDriver(CommandMixin, Driver, CommandProtocol, FileTransferProtocol):
"""SSHDriver - Driver to execute commands via SSH"""
bindings = {"networkservice": "NetworkService", }
priorities = {CommandProtocol: 10, FileTransferProtocol: 10}
keyfile = attr.ib(default="", validator=attr.validators.instance_of(str))
stderr_merge = attr.ib(default=False, validator=attr.validators.instance_of(bool))
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.logger = logging.getLogger("{}({})".format(self, self.target))
self._keepalive = None
def on_activate(self):
self.ssh_prefix = ["-o", "LogLevel=ERROR"]
if self.keyfile:
keyfile_path = self.keyfile
if self.target.env:
# pylint: disable=no-member
from importlib import import_module
from decimal import Decimal
import attr
from .common import Driver
from ..factory import target_factory
from ..exceptions import InvalidConfigError
from ..util.agentwrapper import AgentWrapper, b2s, s2b
@target_factory.reg_driver
@attr.s(eq=False)
class USBTMCDriver(Driver):
bindings = {
"tmc": {"USBTMC", "NetworkUSBTMC"},
}
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None
self.backend = None
assert self.tmc.path.startswith('/dev/usbtmc')
self.index = int(self.tmc.path[11:], 10)
assert self.tmc.path == '/dev/usbtmc'+str(self.index)
def on_activate(self):
assert self.wrapper is None
self.wrapper = AgentWrapper(self.tmc.host)
@Driver.check_active
@step(args=['filename', 'address'])
def flash(self, filename=None, address=0x0):
if filename is None and self.image is not None:
filename = self.target.env.config.get_image_path(self.image)
mf = ManagedFile(filename, self.interface)
mf.sync_to_resource()
assert isinstance(address, int)
cable_number = self._get_cable_number()
cmd = self.interface.command_prefix + [self.tool]
cmd += [
"--cable={}".format(cable_number),
"--addr=0x{:X}".format(address),
"--operation=P {}".format(mf.get_remote_path()),
]
@Driver.check_active
@step(args=['cmd'])
def run(self, cmd: str, *, timeout: int = 30): # pylint: disable=unused-argument
return self._run(cmd, timeout=timeout)