Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testLittleEndianPayloadDecoder(self):
""" Test basic bit message encoding/decoding """
decoder = BinaryPayloadDecoder(self.little_endian_payload,
byteorder=Endian.Little,
wordorder=Endian.Little)
self.assertEqual(1, decoder.decode_8bit_uint())
self.assertEqual(2, decoder.decode_16bit_uint())
self.assertEqual(3, decoder.decode_32bit_uint())
self.assertEqual(4, decoder.decode_64bit_uint())
self.assertEqual(-1, decoder.decode_8bit_int())
self.assertEqual(-2, decoder.decode_16bit_int())
self.assertEqual(-3, decoder.decode_32bit_int())
self.assertEqual(-4, decoder.decode_64bit_int())
self.assertEqual(1.25, decoder.decode_32bit_float())
self.assertEqual(6.25, decoder.decode_64bit_float())
self.assertEqual(None, decoder.skip_bytes(2))
self.assertEqual('test', decoder.decode_string(4).decode())
self.assertEqual(self.bitstring, decoder.decode_bits())
def fromCoils(coils, endian=Endian.Little):
""" Initialize a payload decoder with the result of
reading a collection of coils from a modbus device.
The coils are treated as a list of bit(boolean) values.
:param coils: The coil results to initialize with
:param endian: The endianess of the payload
:returns: An initialized PayloadDecoder
"""
if isinstance(coils, list):
payload = pack_bitstring(coils)
return BinaryPayloadDecoder(payload, endian)
raise ParameterException('Invalid collection of coils supplied')
# - a 8 byte string 'abcdefgh'
# - a 32 bit float 22.34
# - a 16 bit unsigned int 0x1234
# - another 16 bit unsigned int which we will ignore
# - an 8 bit int 0x12
# - an 8 bit bitstring [0,1,0,1,1,0,1,0]
# ----------------------------------------------------------------------- #
address = 0x0
count = len(payload)
result = client.read_holding_registers(address, count, unit=1)
print("-" * 60)
print("Registers")
print("-" * 60)
print(result.registers)
print("\n")
decoder = BinaryPayloadDecoder.fromRegisters(result.registers,
byteorder=Endian.Little,
wordorder=Endian.Little)
decoded = OrderedDict([
('string', decoder.decode_string(8)),
('bits', decoder.decode_bits()),
('8int', decoder.decode_8bit_int()),
('8uint', decoder.decode_8bit_uint()),
('16int', decoder.decode_16bit_int()),
('16uint', decoder.decode_16bit_uint()),
('32int', decoder.decode_32bit_int()),
('32uint', decoder.decode_32bit_uint()),
('32float', decoder.decode_32bit_float()),
('32float2', decoder.decode_32bit_float()),
('64int', decoder.decode_64bit_int()),
('64uint', decoder.decode_64bit_uint()),
import sys
import os
import time
import getopt
import socket
import ConfigParser
import struct
import binascii
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
ipaddress = str(sys.argv[1])
from pymodbus.client.sync import ModbusTcpClient
client = ModbusTcpClient(ipaddress, port=1502)
#PV Leistung
resp= client.read_holding_registers(100,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
pvwatt =int(FRegister.decode_32bit_float())
fpvwatt = pvwatt * -1
f = open('/var/www/html/openWB/ramdisk/pvwatt', 'w')
f.write(str(fpvwatt))
f.close()
#pv kwh
resp= client.read_holding_registers(320,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
final =int(FRegister.decode_32bit_float())
f = open('/var/www/html/openWB/ramdisk/pvkwh', 'w')
f.write(str(final))
f.close()
pvkwhk= final / 1000
f = open('/var/www/html/openWB/ramdisk/pvkwhk', 'w')
f.write(str(pvkwhk))
f.close()
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
ipaddress = str(sys.argv[1])
from pymodbus.client.sync import ModbusTcpClient
client = ModbusTcpClient(ipaddress, port=1502)
#PV Leistung
resp= client.read_holding_registers(172,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
pvwatt =int(FRegister.decode_32bit_float())
fpvwatt = pvwatt * -1
f = open('/var/www/html/openWB/ramdisk/pvwatt', 'w')
f.write(str(fpvwatt))
f.close()
#evu A 1-3
resp= client.read_holding_registers(222,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
final =round(FRegister.decode_32bit_float(),2)
f = open('/var/www/html/openWB/ramdisk/bezuga1', 'w')
f.write(str(final))
f.close()
resp= client.read_holding_registers(232,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
final =round(FRegister.decode_32bit_float(),2)
f = open('/var/www/html/openWB/ramdisk/bezuga2', 'w')
f.write(str(final))
f.close()
resp= client.read_holding_registers(242,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
final =round(FRegister.decode_32bit_float(),2)
f = open('/var/www/html/openWB/ramdisk/bezuga3', 'w')
f.write(str(final))
import sys
import os
import time
import getopt
import socket
import ConfigParser
import struct
import binascii
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
ipaddress = str(sys.argv[1])
from pymodbus.client.sync import ModbusTcpClient
client = ModbusTcpClient(ipaddress, port=1502)
#PV Leistung
resp= client.read_holding_registers(172,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
pvwatt =int(FRegister.decode_32bit_float())
fpvwatt = pvwatt * -1
f = open('/var/www/html/openWB/ramdisk/pvwatt', 'w')
f.write(str(fpvwatt))
f.close()
#evu A 1-3
resp= client.read_holding_registers(222,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
final =round(FRegister.decode_32bit_float(),2)
f = open('/var/www/html/openWB/ramdisk/bezuga1', 'w')
f.write(str(final))
f.close()
resp= client.read_holding_registers(232,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
final =round(FRegister.decode_32bit_float(),2)
f = open('/var/www/html/openWB/ramdisk/bezuga2', 'w')
client.
:param host: The host to connect to
:returns: an initialized SunspecClient
"""
modbus = ModbusTcpClient(host)
modbus.connect()
client = SunspecClient(modbus)
client.initialize()
return client
# --------------------------------------------------------------------------- #
# Sunspec Client
# --------------------------------------------------------------------------- #
class SunspecDecoder(BinaryPayloadDecoder):
""" A decoder that deals correctly with the sunspec
binary format.
"""
def __init__(self, payload, endian):
""" Initialize a new instance of the SunspecDecoder
.. note:: This is always set to big endian byte order
as specified in the protocol.
"""
endian = Endian.Big
BinaryPayloadDecoder.__init__(self, payload, endian)
def decode_string(self, size=1):
""" Decodes a string from the buffer
result = input_data.bits
log.debug(result)
if "registerCount" in data_sent:
result = result[:data_sent["registerCount"]]
else:
result = result[0]
elif data_sent.get("functionCode") == 3 or data_sent.get("functionCode") == 4:
result = input_data.registers
byte_order = data_sent.get("byteOrder", "LITTLE")
reg_count = data_sent.get("registerCount", 1)
type_of_data = data_sent["type"]
try:
if byte_order == "LITTLE":
decoder = BinaryPayloadDecoder.fromRegisters(result, byteorder=Endian.Little)
elif byte_order == "BIG":
decoder = BinaryPayloadDecoder.fromRegisters(result, byteorder=Endian.Big)
else:
log.warning("byte order is not BIG or LITTLE")
continue
except Exception as e:
log.error(e)
if type_of_data == "string":
result = decoder.decode_string(2 * reg_count)
elif type_of_data == "long":
if reg_count == 1:
result = decoder.decode_16bit_int()
elif reg_count == 2:
result = decoder.decode_32bit_int()
elif reg_count == 4:
result = decoder.decode_64bit_int()
else:
log.warning("unsupported register count for long data type in response for tag %s",
# Connection opened by parent class INIT
# Retrieve fronius specific inverter info if connection successfull if connection successfull
self._log.debug("Fronius args: " + str(modbus_IP) + " - " + str(modbus_port) )
self._log.debug("EmonFroniusModbusTcpInterfacer: Init")
if self._modcon :
# Display device firmware version and current settings
self.info =["",""]
#self._log.info("Modtcp Connected")
r2= self._con.read_holding_registers(40005-1,4,unit=1)
r3= self._con.read_holding_registers(40021-1,4,unit=1)
invBrand = BinaryPayloadDecoder.fromRegisters(r2.registers, endian=Endian.Big)
invModel = BinaryPayloadDecoder.fromRegisters(r3.registers, endian=Endian.Big)
self._log.info( self.name + " Inverter: " + invBrand.decode_string(8) + " " + invModel.decode_string(8))
swDM= self._con.read_holding_registers(40037-1,8,unit=1)
swInv= self._con.read_holding_registers(40045-1,8,unit=1)
swDMdecode = BinaryPayloadDecoder.fromRegisters(swDM.registers, endian=Endian.Big)
swInvdecode = BinaryPayloadDecoder.fromRegisters(swInv.registers, endian=Endian.Big)
self._log.info( self.name + " SW Versions: Datamanager " + swDMdecode.decode_string(16) + "- Inverter " + swInvdecode.decode_string(16))
r1 = self._con.read_holding_registers(40070-1,1,unit=1)
ssModel = BinaryPayloadDecoder.fromRegisters(r1.registers, endian=Endian.Big)
self._log.info( self.name + " SunSpec Model: " + str(ssModel.decode_16bit_uint()) )