Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import sys
import logging
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.payload import BinaryPayloadBuilder
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.INFO)
# create connection (boot mode is 9600)
client = ModbusClient(method='rtu', port='/dev/ttyUSB0', baudrate=9600, timeout=1.5)
client.connect()
idslave = 0x01
if len(sys.argv) == 2:
try:
idslave = int(sys.argv[1])
except:
print ("usage: %s [idslave]" % sys.argv[0])
sys.exit(-1)
# get running mode
print ("modbus cmd: 0x03 value: 0x0000 length: 0x01\n")
result = client.read_holding_registers(address=0x0000, count=0x01, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
print (decoder.decode_16bit_int(), " (running mode)\n")
# Connect to Modbus network
# pylint: disable=import-error
if client_type == 'serial':
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
client = ModbusClient(method=config[DOMAIN][CONF_METHOD],
port=config[DOMAIN][CONF_PORT],
baudrate=config[DOMAIN][CONF_BAUDRATE],
stopbits=config[DOMAIN][CONF_STOPBITS],
bytesize=config[DOMAIN][CONF_BYTESIZE],
parity=config[DOMAIN][CONF_PARITY],
timeout=config[DOMAIN][CONF_TIMEOUT])
elif client_type == 'rtuovertcp':
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
from pymodbus.transaction import ModbusRtuFramer as ModbusFramer
client = ModbusClient(host=config[DOMAIN][CONF_HOST],
port=config[DOMAIN][CONF_PORT],
framer=ModbusFramer,
timeout=config[DOMAIN][CONF_TIMEOUT])
elif client_type == 'tcp':
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
client = ModbusClient(host=config[DOMAIN][CONF_HOST],
port=config[DOMAIN][CONF_PORT],
timeout=config[DOMAIN][CONF_TIMEOUT])
elif client_type == 'udp':
from pymodbus.client.sync import ModbusUdpClient as ModbusClient
client = ModbusClient(host=config[DOMAIN][CONF_HOST],
port=config[DOMAIN][CONF_PORT],
timeout=config[DOMAIN][CONF_TIMEOUT])
else:
return False
def init_trovis(self):
try:
# self._modbus_debug = False #ToDo
connection = ModbusClient(method=self._modbus_mode, port=self._modbus_port, timeout=self._modbus_timeout, baudrate=self._modbus_speed)
if connection.connect():
self._connected = True
self.logger.debug('Verbindung zur Trovis hergestellt: ' + str(connection))
else:
self._connected = False
self.logger.debug('Verbindung zur Trovis fehlgeschlagen: ' + str(connection))
except Exception as e:
self.logger.debug('Exception beim Trovis Verbindungsaufbau: ' + str(e))
return connection
# This code is not doing conversion from rpm to hz, but afraid to
# touch it. We will do the conversion on production hardware and
# leave the emulator code alone.
result = client.write_registers(
self._register_map['speed_rpm'],
[speed_rpm],
unit=self.unit)
if result is None:
raise SynseException(
'No response received for GS3-2010 fan control.')
elif isinstance(result, ExceptionResponse):
raise SynseException('RS485 Exception: {}'.format(result))
# in all cases, read out / compute the fan_speed from RS485
with ModbusClient(method=self.method, port=self.device_name,
timeout=self.timeout) as client:
# read speed
result = client.read_holding_registers(
self._register_map['speed_rpm'],
count=1,
unit=self.unit)
if result is None:
raise SynseException(
'No response received for GS3-2010 fan control.')
elif isinstance(result, ExceptionResponse):
raise SynseException('RS485 Exception: {}'.format(result))
# create client and read registers, composing a reading to return
return {
const.UOM_VAPOR_FAN: result.registers[0],
#!/usr/bin/python
from pymodbus.client.sync import ModbusSerialClient
client = ModbusSerialClient(method = "rtu", port="/dev/virtualcom1", baudrate=9600,
stopbits=1, bytesize=8, timeout=1)
rq = client.read_holding_registers(1000,7,unit=1)
print(rq.registers)
def _direct_sensor_read(self):
""" Internal method for reading data off of the SHT31 Humidity device.
Returns:
dict: the temperature and humidity reading values.
"""
with self._lock:
if self.hardware_type == 'emulator':
with ModbusClient(method=self.method, port=self.device_name,
timeout=self.timeout) as client:
# read temperature
result = client.read_holding_registers(
self._register_map['temperature_register'],
count=1,
unit=self.unit)
if result is None:
raise SynseException(
'No response received for SHT31 temperature reading.')
elif isinstance(result, ExceptionResponse):
raise SynseException('RS485 Exception: {}'.format(result))
temperature = conversions.temperature_sht31_int(result.registers[0])
# read humidity
def __init__(self, unit = 1, serialclient = None, **kwargs):
''' Initialize a serial client instance
'''
self.unit = unit
if serialclient == None:
port = kwargs.get('port', '/dev/ttyXRUSB0')
baudrate = kwargs.get('baudrate', 115200)
self.client = ModbusClient(method = 'rtu', port = port, baudrate = baudrate, kwargs = kwargs)
else:
self.client = serialclient
#!/usr/bin/python
import sys
import os
import time
import getopt
import socket
import ConfigParser
import struct
import binascii
seradd = str(sys.argv[1])
from pymodbus.client.sync import ModbusSerialClient
client = ModbusSerialClient(method = "rtu", port=seradd, baudrate=9600,
stopbits=1, bytesize=8, timeout=1)
idadd = int(sys.argv[2])
resp = client.read_input_registers(0x00,2, unit=idadd)
voltage = struct.unpack('>f',struct.pack('>HH',*resp.registers))[0]
voltage = float("%.1f" % voltage)
f = open('/var/www/html/openWB/ramdisk/llv1', 'w')
f.write(str(voltage))
f.close()
resp = client.read_input_registers(0x06,2, unit=idadd)
lla1 = float(struct.unpack('>f',struct.pack('>HH',*resp.registers))[0])
lla1 = float("%.1f" % lla1)
f = open('/var/www/html/openWB/ramdisk/lla1', 'w')
f.write(str(lla1))
f.close()
def setUp(self):
""" Initializes the test environment """
super(Runner, self).setUp()
# "../tools/nullmodem/linux/run",
self.initialize(["../tools/reference/diagslave", "-m", "ascii", "/dev/pts/14"])
self.client = ModbusClient(method='ascii', timeout=0.2, port='/dev/pts/13')
self.client.connect()