Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
client = ModbusClient(method='rtu', port='/dev/ttyUSB0', baudrate=9600, timeout=1.5)
client.connect()
idslave = 0x01
if len(sys.argv) == 2:
try:
idslave = int(sys.argv[1])
except:
print ("usage: %s [idslave]" % sys.argv[0])
sys.exit(-1)
# get running mode
print ("modbus cmd: 0x03 value: 0x0000 length: 0x01\n")
result = client.read_holding_registers(address=0x0000, count=0x01, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
print (decoder.decode_16bit_int(), " (running mode)\n")
print ("")
# get loader version
print ("modbus cmd: 0x03 value: 0x0001 length: 0x02\n")
result = client.read_holding_registers(address=0x0001, count=0x02, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
x = decoder.decode_32bit_int();
print (''.join(chr((x>>8*(4-byte-1))&0xFF) for byte in range(4)) , " (software version)\n")
print ("")
client.close()
reg2_320 = client2.read_holding_registers(320,2,unit=71)
# Plenticore Register 322: Daily_yield [Wh]
# ist PV Tagesertrag
reg2_322 = client2.read_holding_registers(322,2,unit=71)
# Plenticore Register 324: Yearly_yield [Wh]
# ist PV Jahresertrag
reg2_324 = client2.read_holding_registers(324,2,unit=71)
# Plenticore Register 326: Monthly_yield [Wh]
# ist PV Monatsertrag
reg2_326 = client2.read_holding_registers(326,2,unit=71)
# ausgelesene Register WR 1 dekodieren
FRegister_100 = BinaryPayloadDecoder.fromRegisters(reg_100.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_150 = BinaryPayloadDecoder.fromRegisters(reg_150.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_220 = BinaryPayloadDecoder.fromRegisters(reg_220.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_222 = BinaryPayloadDecoder.fromRegisters(reg_222.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_224 = BinaryPayloadDecoder.fromRegisters(reg_224.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_230 = BinaryPayloadDecoder.fromRegisters(reg_230.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_232 = BinaryPayloadDecoder.fromRegisters(reg_232.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_234 = BinaryPayloadDecoder.fromRegisters(reg_234.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_240 = BinaryPayloadDecoder.fromRegisters(reg_240.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_242 = BinaryPayloadDecoder.fromRegisters(reg_242.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_244 = BinaryPayloadDecoder.fromRegisters(reg_244.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_250 = BinaryPayloadDecoder.fromRegisters(reg_250.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_252 = BinaryPayloadDecoder.fromRegisters(reg_252.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_320 = BinaryPayloadDecoder.fromRegisters(reg_320.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_322 = BinaryPayloadDecoder.fromRegisters(reg_322.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_324 = BinaryPayloadDecoder.fromRegisters(reg_324.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_326 = BinaryPayloadDecoder.fromRegisters(reg_326.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_514 = BinaryPayloadDecoder.fromRegisters(reg_514.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_575 = BinaryPayloadDecoder.fromRegisters(reg_575.registers, byteorder=Endian.Big, wordorder=Endian.Little)
FRegister_582 = BinaryPayloadDecoder.fromRegisters(reg_582.registers, byteorder=Endian.Big, wordorder=Endian.Little)
unitId = int(nUnitList[idx])
else:
unitId = 1
self._log.debug("register # :" + str(register) + ", qty #: " + str(qty) + ", unit #: " + str(unitId))
try:
self.rVal = self._con.read_holding_registers(register-1,qty,unit=unitId)
assert(self.rVal.function_code < 0x80)
except Exception as e:
self._log.error("Connection failed on read of register: " +str(register) + " : " + str(e))
self._modcon = False
else:
#self._log.debug("register value:" + str(self.rVal.registers)+" type= " + str(type(self.rVal.registers)))
#f = f + self.rVal.registers
decoder = BinaryPayloadDecoder.fromRegisters(self.rVal.registers, endian=Endian.Big)
self._log.debug("register type: " + str(rType))
if rType == "uint16":
rValD = decoder.decode_16bit_uint()
t = emonhub_coder.encode('H',rValD)
f = f + list(t)
elif rType == "uint32":
rValD = decoder.decode_32bit_uint()
t = emonhub_coder.encode('I',rValD)
f = f + list(t)
elif rType == "uint64":
rValD = decoder.decode_64bit_uint()
t = emonhub_coder.encode('Q',rValD)
f = f + list(t)
elif rType == "int16":
rValD = decoder.decode_16bit_int()
result = None
if data_sent.get("functionCode") == 1 or data_sent.get("functionCode") == 2:
result = input_data.bits
log.debug(result)
if "registerCount" in data_sent:
result = result[:data_sent["registerCount"]]
else:
result = result[0]
elif data_sent.get("functionCode") == 3 or data_sent.get("functionCode") == 4:
result = input_data.registers
byte_order = data_sent.get("byteOrder", "LITTLE")
reg_count = data_sent.get("registerCount", 1)
type_of_data = data_sent["type"]
try:
if byte_order == "LITTLE":
decoder = BinaryPayloadDecoder.fromRegisters(result, byteorder=Endian.Little)
elif byte_order == "BIG":
decoder = BinaryPayloadDecoder.fromRegisters(result, byteorder=Endian.Big)
else:
log.warning("byte order is not BIG or LITTLE")
continue
except Exception as e:
log.error(e)
if type_of_data == "string":
result = decoder.decode_string(2 * reg_count)
elif type_of_data == "long":
if reg_count == 1:
result = decoder.decode_16bit_int()
elif reg_count == 2:
result = decoder.decode_32bit_int()
elif reg_count == 4:
result = decoder.decode_64bit_int()
# Word Order - Big Byte Order - Big
# word1 =0x1234 word2 = 0x5678
# Word Order - Big Byte Order - Little
# word1 =0x3412 word2 = 0x7856
# Word Order - Little Byte Order - Big
# word1 = 0x5678 word2 = 0x1234
# Word Order - Little Byte Order - Little
# word1 =0x7856 word2 = 0x3412
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ #
# ----------------------------------------------------------------------- #
builder = BinaryPayloadBuilder(byteorder=Endian.Big,
wordorder=Endian.Little)
builder.add_string('abcdefgh')
builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0])
builder.add_8bit_int(-0x12)
builder.add_8bit_uint(0x12)
builder.add_16bit_int(-0x5678)
builder.add_16bit_uint(0x1234)
builder.add_32bit_int(-0x1234)
builder.add_32bit_uint(0x12345678)
builder.add_32bit_float(22.34)
builder.add_32bit_float(-22.34)
builder.add_64bit_int(-0xDEADBEEF)
builder.add_64bit_uint(0x12345678DEADBEEF)
builder.add_64bit_uint(0x12345678DEADBEEF)
builder.add_64bit_float(123.45)
builder.add_64bit_float(-123.45)
payload = builder.to_registers()
for value in values:
if value[1] is not None:
if value[4] == 'U16':
result = client.read_holding_registers(value[1], 1, unit=mb_id)
elif value[4] in ('U32', 'I32'):
result = client.read_holding_registers(value[1], 2, unit=mb_id)
if result.isError():
if debug == 1:
print('Could not read register {} ({})'.format(value[1], value[4]))
# Default value for not readable values
decoded_value = 0
decoded_value_str = '0'
else:
decoder = BinaryPayloadDecoder.fromRegisters(result.registers,
byteorder=Endian.Big,
wordorder=Endian.Big)
# Check the datatype (default is unsigned int 32 bit)
if value[4] == 'U16':
decoded_value_raw = decoder.decode_16bit_uint()
elif value[4] == 'I32':
decoded_value_raw = decoder.decode_32bit_int()
else:
decoded_value_raw = decoder.decode_32bit_uint()
# Mulitply the raw value with the corresponding multiplier to get the correct unit
decoded_value = decoded_value_raw*value[2]
# Fill the value dictionary with values
value_dict[value[0]] = [decoded_value, ]
# Generate the value strings for the ramdisk file according to the specified precision
for value in values:
value_dict[value[0]].append('{:.{prec}f}'.format(value_dict[value[0]][0], prec=value[3]))
import getopt
import socket
import ConfigParser
import struct
import binascii
import logging
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
from pymodbus.client.sync import ModbusTcpClient
client = ModbusTcpClient('192.168.193.125', port=8899)
sdmid = int(85)
time.sleep(0.1)
#reg bat volt
resp = client.read_holding_registers(0x0100,2, unit=sdmid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
voltr = int(decoder.decode_16bit_int())
#voltr = resp.registers[0]
#print('volt'+str(voltr))
time.sleep(0.1)
#reg battamp
resp = client.read_holding_registers(0x0101,2, unit=sdmid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
battcur = int(decoder.decode_16bit_int())
#print('battcur'+str(battcur))
volt = voltr
amp = battcur
#print(volt)
#print(amp)
#print(amp)
battwatt = float(volt * amp * -1 / 100)
battwatt = int(battwatt)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
print (decoder.decode_32bit_float(), " Volt (internal)\n")
print ("")
print ("0x03 0x0004\n")
result = client.read_holding_registers(address=0x0004, count=0x02, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
print (decoder.decode_32bit_float(), " Celsius (internal)\n")
print ("")
print ("0x04 0x0000\n")
result = client.read_input_registers(address=0x0000, count=0x01, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
print (decoder.decode_16bit_int(), " maximum devices \n")
print ("")
print ("0x04 0x0001\n")
result = client.read_input_registers(address=0x0001, count=0x01, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
value = decoder.decode_16bit_int()
print (value, " devices found\n")
# iterate sensors
for idx in range(0, value):
print (" id: %i 0x04 0x%04x" % (idx, 0x0100+idx))
result = client.read_input_registers(address=0x0100+idx, count=0x04, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
w1 = str(decoder.decode_16bit_int())
resp= client.read_holding_registers(2601,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
w2 = str(decoder.decode_16bit_int())
resp= client.read_holding_registers(2602,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
w3 = str(decoder.decode_16bit_int())
watt = int(w1) + int(w2) + int(w3)
f = open('/var/www/html/openWB/ramdisk/wattbezug', 'w')
f.write(str(watt))
f.close()
#grid ampere
resp= client.read_holding_registers(2617,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
a1 = str(decoder.decode_16bit_int())
a1 = float(a1) / 10
resp= client.read_holding_registers(2619,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
a2 = str(decoder.decode_16bit_int())
a2 = float(a2) / 10
resp= client.read_holding_registers(2621,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
a3 = str(decoder.decode_16bit_int())
a3 = float(a3) / 10
f = open('/var/www/html/openWB/ramdisk/bezuga1', 'w')
f.write(str(a1))
f.close()
f = open('/var/www/html/openWB/ramdisk/bezuga2', 'w')
f.write(str(a2))
f.close()
def convert(self, config, data):
byte_order = config["byteOrder"] if config.get("byteOrder") else "LITTLE"
if byte_order == "LITTLE":
builder = BinaryPayloadBuilder(byteorder=Endian.Little)
elif byte_order == "BIG":
builder = BinaryPayloadBuilder(byteorder=Endian.Big)
else:
log.warning("byte order is not BIG or LITTLE")
return
reg_count = config.get("registerCount", 1)
value = config["value"]
if config.get("tag") is not None:
tags = (findall('[A-Z][a-z]*', config["tag"]))
if "Coil" in tags:
builder.add_bits(value)
elif "String" in tags:
builder.add_string(value)
elif "Double" in tags:
if reg_count == 4:
builder.add_64bit_float(value)