Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testPayloadDecoderCoilFactory(self):
""" Test the payload decoder reset functionality """
payload = [1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1]
decoder = BinaryPayloadDecoder.fromCoils(payload, byteorder=Endian.Little)
encoded = b'\x88\x11'
self.assertEqual(encoded, decoder.decode_string(2))
decoder = BinaryPayloadDecoder.fromCoils(payload, byteorder=Endian.Big)
encoded = b'\x88\x11'
self.assertEqual(encoded, decoder.decode_string(2))
self.assertRaises(ParameterException,
lambda: BinaryPayloadDecoder.fromCoils('abcd'))
except:
print ("usage: %s [idslave]" % sys.argv[0])
sys.exit(-1)
# get running mode
print ("modbus cmd: 0x03 value: 0x0000 length: 0x01\n")
result = client.read_holding_registers(address=0x0000, count=0x01, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
print (decoder.decode_16bit_int(), " (running mode)\n")
print ("")
# get loader version
print ("modbus cmd: 0x03 value: 0x0001 length: 0x02\n")
result = client.read_holding_registers(address=0x0001, count=0x02, unit=idslave)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
x = decoder.decode_32bit_int();
print (''.join(chr((x>>8*(4-byte-1))&0xFF) for byte in range(4)) , " (software version)\n")
print ("")
client.close()
def ReadUInt64(self,addr):
data=self.client.read_holding_registers(addr,4,unit=71)
UInt64register = BinaryPayloadDecoder.fromRegisters(data.registers, byteorder=Endian.Big, wordorder=Endian.Big)
result = UInt64register.decode_64bit_uint()
return(result)
"""
# Initialization
super(EmonFroniusModbusTcpInterfacer, self).__init__(name)
# Connection opened by parent class INIT
# Retrieve fronius specific inverter info if connection successfull if connection successfull
self._log.debug("Fronius args: " + str(modbus_IP) + " - " + str(modbus_port) )
self._log.debug("EmonFroniusModbusTcpInterfacer: Init")
if self._modcon :
# Display device firmware version and current settings
self.info =["",""]
#self._log.info("Modtcp Connected")
r2= self._con.read_holding_registers(40005-1,4,unit=1)
r3= self._con.read_holding_registers(40021-1,4,unit=1)
invBrand = BinaryPayloadDecoder.fromRegisters(r2.registers, endian=Endian.Big)
invModel = BinaryPayloadDecoder.fromRegisters(r3.registers, endian=Endian.Big)
self._log.info( self.name + " Inverter: " + invBrand.decode_string(8) + " " + invModel.decode_string(8))
swDM= self._con.read_holding_registers(40037-1,8,unit=1)
swInv= self._con.read_holding_registers(40045-1,8,unit=1)
swDMdecode = BinaryPayloadDecoder.fromRegisters(swDM.registers, endian=Endian.Big)
swInvdecode = BinaryPayloadDecoder.fromRegisters(swInv.registers, endian=Endian.Big)
self._log.info( self.name + " SW Versions: Datamanager " + swDMdecode.decode_string(16) + "- Inverter " + swInvdecode.decode_string(16))
r1 = self._con.read_holding_registers(40070-1,1,unit=1)
ssModel = BinaryPayloadDecoder.fromRegisters(r1.registers, endian=Endian.Big)
self._log.info( self.name + " SunSpec Model: " + str(ssModel.decode_16bit_uint()) )
item(t2, 'Pluggit')
# Abluft innen
if values == self._modbusRegisterDic['prmRamIdxT3']:
t3 = self._Pluggit.read_holding_registers(values, 2, unit=22)
decodert3 = BinaryPayloadDecoder.fromRegisters(t3.registers, endian=Endian.Big)
t3 = decodert3.decode_32bit_float()
t3 = round(t3, 2)
# logger.debug("Pluggit: Abluft innen: {0:4.1f}".format(t3))
# logger.debug("Pluggit: Abluft innen: {0}".format(t3))
item(t3, 'Pluggit')
# Fortluft außen
if values == self._modbusRegisterDic['prmRamIdxT4']:
t4 = self._Pluggit.read_holding_registers(values, 2, unit=22)
decodert4 = BinaryPayloadDecoder.fromRegisters(t4.registers, endian=Endian.Big)
t4 = decodert4.decode_32bit_float()
t4 = round(t4, 2)
# logger.debug("Pluggit: Fortluft außen: {0:4.1f}".format(t4))
# logger.debug("Pluggit: Fortluft außen: {0}".format(t4))
item(t4, 'Pluggit')
# logger.debug("Pluggit: ------------------------------------------> Ende der Schleife vor sleep, Durchlauf Nr. {0}".format(myCounter))
time.sleep(0.1)
# myCounter += 1
except Exception as e:
logger.error("Pluggit: something went wrong in the refresh function: {0}".format(e))
return
end_time = time.time()
cycletime = end_time - start_time
logger.debug("Pluggit: cycle took {0} seconds".format(cycletime))
import socket
import ConfigParser
import struct
import binascii
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadDecoder
ipaddress = str(sys.argv[1])
modbid = int(sys.argv[2])
from pymodbus.client.sync import ModbusTcpClient
client = ModbusTcpClient(ipaddress, port=502)
connection = client.connect()
#grid power
resp= client.read_holding_registers(2600,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
w1 = str(decoder.decode_16bit_int())
resp= client.read_holding_registers(2601,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
w2 = str(decoder.decode_16bit_int())
resp= client.read_holding_registers(2602,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
w3 = str(decoder.decode_16bit_int())
watt = int(w1) + int(w2) + int(w3)
f = open('/var/www/html/openWB/ramdisk/wattbezug', 'w')
f.write(str(watt))
f.close()
#grid ampere
resp= client.read_holding_registers(2617,1,unit=modbid)
decoder = BinaryPayloadDecoder.fromRegisters(resp.registers,byteorder=Endian.Big,wordorder=Endian.Big)
a1 = str(decoder.decode_16bit_int())
def convert(self, config, data):
byte_order = config["byteOrder"] if config.get("byteOrder") else "LITTLE"
if byte_order == "LITTLE":
builder = BinaryPayloadBuilder(byteorder=Endian.Little)
elif byte_order == "BIG":
builder = BinaryPayloadBuilder(byteorder=Endian.Big)
else:
log.warning("byte order is not BIG or LITTLE")
return
reg_count = config.get("registerCount", 1)
value = config["value"]
if config.get("tag") is not None:
tags = (findall('[A-Z][a-z]*', config["tag"]))
if "Coil" in tags:
builder.add_bits(value)
elif "String" in tags:
builder.add_string(value)
elif "Double" in tags:
if reg_count == 4:
builder.add_64bit_float(value)
else:
log.warning("unsupported amount of registers with double type for device %s in Downlink converter",
def fromRegisters(klass, registers, byteorder=Endian.Little,
wordorder=Endian.Big):
""" Initialize a payload decoder with the result of
reading a collection of registers from a modbus device.
The registers are treated as a list of 2 byte values.
We have to do this because of how the data has already
been decoded by the rest of the library.
:param registers: The register results to initialize with
:param byteorder: The Byte order of each word
:param wordorder: The endianess of the word (when wordcount is >= 2)
:returns: An initialized PayloadDecoder
"""
_logger.debug(registers)
if isinstance(registers, list): # repack into flat binary
payload = b''.join(pack('!H', x) for x in registers)
return klass(payload, byteorder, wordorder)
def getBinaryPayloadDecoderFromRegisters(registers,byteorderLittle=True,wordorderLittle=False):
import pymodbus.version as pymodbus_version
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadDecoder
if byteorderLittle:
byteorder = Endian.Little
else:
byteorder = Endian.Big
if wordorderLittle:
wordorder = Endian.Little
else:
wordorder = Endian.Big
if pymodbus_version.version.major > 1 or (pymodbus_version.version.major == 1 and pymodbus_version.version.minor > 3):
# pymodbus v1.4 and newer
return BinaryPayloadDecoder.fromRegisters(registers, byteorder=byteorder, wordorder=wordorder)
else:
# pymodbus v1.3 and older
return BinaryPayloadDecoder.fromRegisters(registers, endian=byteorder)
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
ipaddress = str(sys.argv[1])
from pymodbus.client.sync import ModbusTcpClient
client = ModbusTcpClient(ipaddress, port=1502)
#PV Leistung
resp= client.read_holding_registers(100,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
pvwatt =int(FRegister.decode_32bit_float())
fpvwatt = pvwatt * -1
f = open('/var/www/html/openWB/ramdisk/pvwatt', 'w')
f.write(str(fpvwatt))
f.close()
#pv kwh
resp= client.read_holding_registers(320,2,unit=71)
FRegister = BinaryPayloadDecoder.fromRegisters(resp.registers, byteorder=Endian.Big, wordorder=Endian.Little)
final =int(FRegister.decode_32bit_float())
f = open('/var/www/html/openWB/ramdisk/pvkwh', 'w')
f.write(str(final))
f.close()
pvkwhk= final / 1000
f = open('/var/www/html/openWB/ramdisk/pvkwhk', 'w')
f.write(str(pvkwhk))
f.close()