Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, **kwargs):
""" Initializes the base data for a modbus request """
self.transaction_id = kwargs.get('transaction', Defaults.TransactionId)
self.protocol_id = kwargs.get('protocol', Defaults.ProtocolId)
self.unit_id = kwargs.get('unit', Defaults.UnitId)
self.skip_encode = kwargs.get('skip_encode', False)
self.check = 0x0000
def configure(self, config_dict, registry_config_str):
self.slave_id=config_dict.get("slave_id", 0)
self.ip_address = config_dict["device_address"]
self.port = config_dict.get("port", Defaults.Port)
self.parse_config(registry_config_str)
def reactor_factory(host="127.0.0.1", port=Defaults.Port, framer=None,
source_address=None, timeout=None, **kwargs):
"""
Factory to create twisted udp async client
:param host: Host IP address
:param port: Port
:param framer: Modbus Framer
:param source_address: Bind address
:param timeout: Timeout in seconds
:param kwargs:
:return: event_loop_thread and twisted_deferred
"""
raise NotImplementedError()
def async_io_factory(host="127.0.0.1", port=Defaults.Port, framer=None,
source_address=None, timeout=None, **kwargs):
"""
Factory to create asyncio based async tcp clients
:param host: Host IP address
:param port: Port
:param framer: Modbus Framer
:param source_address: Bind address
:param timeout: Timeout in seconds
:param kwargs:
:return: asyncio event loop and tcp client
"""
import asyncio
from pymodbus.client.async.asyncio import init_tcp_client
loop = kwargs.get("loop") or asyncio.new_event_loop()
proto_cls = kwargs.get("proto_cls", None)
if not loop.is_running():
def execute(self, request):
''' Starts the producer to send the next request to
consumer.write(Frame(request))
'''
self.response = None
retries = Defaults.Retries
request.transaction_id = self.__getNextTID()
_logger.debug("Running transaction %d" % request.transaction_id)
while retries > 0:
try:
self.client.connect()
self.client._send(self.client.framer.buildPacket(request))
# I need to fix this to read the header and the result size,
# as this may not read the full result set, but right now
# it should be fine...
result = self.client._recv(1024)
self.client.framer.processIncomingPacket(result, self.__set_result)
break;
except socket.error, msg:
self.client.close()
_logger.debug("Transaction failed. (%s) " % msg)
:param stopbits: The number of stop bits to use
:param bytesize: The bytesize of the serial messages
:param parity: Which kind of parity to use
:param baudrate: The baud rate to use for the serial device
:param timeout: The timeout between serial requests (default 3s)
:param strict: Use Inter char timeout for baudrates <= 19200 (adhere
to modbus standards)
"""
self.method = method
self.socket = None
BaseModbusClient.__init__(self, self.__implementation(method, self),
**kwargs)
self.port = kwargs.get('port', 0)
self.stopbits = kwargs.get('stopbits', Defaults.Stopbits)
self.bytesize = kwargs.get('bytesize', Defaults.Bytesize)
self.parity = kwargs.get('parity', Defaults.Parity)
self.baudrate = kwargs.get('baudrate', Defaults.Baudrate)
self.timeout = kwargs.get('timeout', Defaults.Timeout)
self._strict = kwargs.get("strict", True)
self.last_frame_end = None
if self.method == "rtu":
if self.baudrate > 19200:
self.silent_interval = 1.75 / 1000 # ms
else:
self._t0 = float((1 + 8 + 2)) / self.baudrate
self.inter_char_timeout = 1.5 * self._t0
self.silent_interval = 3.5 * self._t0
self.silent_interval = round(self.silent_interval, 6)
def create_tcp_client(klass, host='127.0.0.1', port=Defaults.Port):
""" Create a TCP modbus client for the supplied parameters.
:param host: The host to connect to
:param port: The port to connect to on that host
:returns: A new level1 client
"""
client = LIB.modbus_new_tcp(host.encode(), port)
return klass(client)
:param framer: The framer strategy to use
:param identity: An optional identify structure
:param address: An optional (interface, port) to bind to.
:param handler: A handler for each client session; default is
ModbusDisonnectedRequestHandler
:param ignore_missing_slaves: True to not send errors on a request
to a missing slave
:param broadcast_enable: True to treat unit_id 0 as broadcast address,
False to treat 0 as any other unit_id
"""
self.threads = []
self.decoder = ServerDecoder()
self.framer = framer or ModbusSocketFramer
self.context = context or ModbusServerContext()
self.control = ModbusControlBlock()
self.address = address or ("", Defaults.Port)
self.handler = handler or ModbusDisconnectedRequestHandler
self.ignore_missing_slaves = kwargs.get('ignore_missing_slaves',
Defaults.IgnoreMissingSlaves)
self.broadcast_enable = kwargs.get('broadcast_enable',
Defaults.broadcast_enable)
if isinstance(identity, ModbusDeviceIdentification):
self.control.Identity.update(identity)
socketserver.ThreadingUDPServer.__init__(self,
self.address, self.handler)
# self._BaseServer__shutdown_request = True
False to treat 0 as any other unit_id
"""
self.active_connections = {}
self.loop = loop or asyncio.get_event_loop()
self.allow_reuse_address = allow_reuse_address
self.decoder = ServerDecoder()
self.framer = framer or ModbusSocketFramer
self.context = context or ModbusServerContext()
self.control = ModbusControlBlock()
self.address = address or ("", Defaults.Port)
self.handler = handler or ModbusConnectedRequestHandler
self.handler.server = self
self.ignore_missing_slaves = kwargs.get('ignore_missing_slaves',
Defaults.IgnoreMissingSlaves)
self.broadcast_enable = kwargs.get('broadcast_enable',
Defaults.broadcast_enable)
if isinstance(identity, ModbusDeviceIdentification):
self.control.Identity.update(identity)
self.serving = self.loop.create_future() # asyncio future that will be done once server has started
self.server = None # constructors cannot be declared async, so we have to defer the initialization of the server
if PYTHON_VERSION >= (3, 7):
# start_serving is new in version 3.7
self.server_factory = self.loop.create_server(lambda : self.handler(self),
*self.address,
reuse_address=allow_reuse_address,
reuse_port=allow_reuse_port,
backlog=backlog,
start_serving=not defer_start)
else:
self.server_factory = self.loop.create_server(lambda : self.handler(self),
:param parity: Which kind of parity to use
:param baudrate: The baud rate to use for the serial device
:param timeout: The timeout between serial requests (default 3s)
:param strict: Use Inter char timeout for baudrates <= 19200 (adhere
to modbus standards)
"""
self.method = method
self.socket = None
BaseModbusClient.__init__(self, self.__implementation(method, self),
**kwargs)
self.port = kwargs.get('port', 0)
self.stopbits = kwargs.get('stopbits', Defaults.Stopbits)
self.bytesize = kwargs.get('bytesize', Defaults.Bytesize)
self.parity = kwargs.get('parity', Defaults.Parity)
self.baudrate = kwargs.get('baudrate', Defaults.Baudrate)
self.timeout = kwargs.get('timeout', Defaults.Timeout)
self._strict = kwargs.get("strict", True)
self.last_frame_end = None
if self.method == "rtu":
if self.baudrate > 19200:
self.silent_interval = 1.75 / 1000 # ms
else:
self._t0 = float((1 + 8 + 2)) / self.baudrate
self.inter_char_timeout = 1.5 * self._t0
self.silent_interval = 3.5 * self._t0
self.silent_interval = round(self.silent_interval, 6)