Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
response = self._parse_params(response)
def _parse_params(self, response):
for p in self.params:
myval = response[:len(p)]
#debug("PARS "+repr(str(myval)))
p.unserialize(myval)
debug("PARS "+repr(str(myval)) + " EVAL "+repr(str(p.value)))
response = response[len(myval):]
return response
def __len__(self):
arglen = sum(len(x) for x in self.params)
return 34 + arglen
class PlugwiseAckResponse(PlugwiseResponse):
ID = b'0000'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.status = Int(0, 4)
self.params += [self.status]
#self.mac = None
self.command_counter = None
def __len__(self):
arglen = sum(len(x) for x in self.params)
return 18 + arglen
def unserialize(self, response):
try:
PlugwiseResponse.__init__(self, seqnr)
self.qin = Int(0, 2)
self.qout = Int(0, 2)
self.pingtime = Int(0, 4)
self.params += [self.qin, self.qout, self.pingtime]
class PlugwiseAssociatedNodesResponse(PlugwiseResponse):
ID = b'0019'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.node_mac_id = String(None, length=16)
self.idx = Int(0, 2)
self.params += [self.node_mac_id, self.idx]
class PlugwiseAdvertiseNodeResponse(PlugwiseResponse):
ID = b'0006'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
class PlugwiseQueryCirclePlusResponse(PlugwiseResponse):
ID = b'0002'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.channel = String(None, length=2)
self.source_mac_id = String(None, length=16)
self.extended_pan_id = String(None, length=16)
self.unique_network_id = String(None, length=16)
self.new_node_mac_id = String(None, length=16)
self.pan_id = String(None, length=4)
class PlugwiseAssociatedNodesResponse(PlugwiseResponse):
ID = b'0019'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.node_mac_id = String(None, length=16)
self.idx = Int(0, 2)
self.params += [self.node_mac_id, self.idx]
class PlugwiseAdvertiseNodeResponse(PlugwiseResponse):
ID = b'0006'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
class PlugwiseQueryCirclePlusResponse(PlugwiseResponse):
ID = b'0002'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.channel = String(None, length=2)
self.source_mac_id = String(None, length=16)
self.extended_pan_id = String(None, length=16)
self.unique_network_id = String(None, length=16)
self.new_node_mac_id = String(None, length=16)
self.pan_id = String(None, length=4)
self.idx = Int(0, length=2)
self.params += [self.channel, self.source_mac_id, self.extended_pan_id, self.unique_network_id, self.new_node_mac_id, self.pan_id, self.idx]
def __len__(self):
arglen = sum(len(x) for x in self.params)
return 18 + arglen
#Clear first two characters of mac ID, as they contain part of the short PAN-ID
self.new_node_mac_id.value = b'00'+self.new_node_mac_id.value[2:]
class PlugwiseQueryCirclePlusEndResponse(PlugwiseResponse):
ID = b'0003'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.status = Int(0, 4)
self.params += [self.status]
def __len__(self):
arglen = sum(len(x) for x in self.params)
return 18 + arglen
class PlugwiseConnectCirclePlusResponse(PlugwiseResponse):
ID = b'0005'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.exsisting = Int(0, 2)
self.allowed = Int(0, 2)
self.params += [self.exsisting, self.allowed]
def __len__(self):
arglen = sum(len(x) for x in self.params)
return 18 + arglen
class PlugwiseRemoveNodeResponse(PlugwiseResponse):
ID = b'001D'
def __init__(self, seqnr = None):
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.datetime = DateTime()
self.last_logaddr = LogAddr(0, length=8)
self.relay_state = Int(0, length=2)
self.hz = Int(0, length=2)
self.hw_ver = String(None, length=12)
self.fw_ver = UnixTimestamp(0)
self.type = Int(0, length=2)
self.params += [
self.datetime,
self.last_logaddr, self.relay_state,
self.hz, self.hw_ver, self.fw_ver, self.type
]
class PlugwiseStatusResponse(PlugwiseResponse):
ID = b'0011'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.unknown = Int(0, length=2)
self.network_is_online = Int(0, length=2)
self.network_id = Int(0, length=16)
self.network_id_short = Int(0, length=4)
self.unknown = Int(0, length=2)
self.params += [
self.unknown,
self.network_is_online,
self.network_id,
self.network_id_short,
self.unknown,
]
raise
class PlugwiseAckMacResponse(PlugwiseAckResponse):
ID = b'0000'
def __init__(self, seqnr = None):
PlugwiseAckResponse.__init__(self, seqnr)
self.acqmac = String(None, length=16)
self.params += [self.acqmac]
def unserialize(self, response):
PlugwiseAckResponse.unserialize(self, response)
self.mac = str(self.acqmac.value)
class PlugwiseCalibrationResponse(PlugwiseResponse):
ID = b'0027'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.gain_a = Float(0, 8)
self.gain_b = Float(0, 8)
self.off_tot = Float(0, 8)
self.off_noise = Float(0, 8)
self.params += [self.gain_a, self.gain_b, self.off_tot, self.off_noise]
class PlugwiseClockInfoResponse(PlugwiseResponse):
ID = b'003F'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.time = Time()
def unserialize(self, response):
PlugwiseAckResponse.unserialize(self, response)
self.mac = str(self.acqmac.value)
class PlugwiseCalibrationResponse(PlugwiseResponse):
ID = b'0027'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.gain_a = Float(0, 8)
self.gain_b = Float(0, 8)
self.off_tot = Float(0, 8)
self.off_noise = Float(0, 8)
self.params += [self.gain_a, self.gain_b, self.off_tot, self.off_noise]
class PlugwiseClockInfoResponse(PlugwiseResponse):
ID = b'003F'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.time = Time()
self.day_of_week = Int(0, 2)
self.unknown = Int(0, 2)
self.scheduleCRC = Int(0, 4)
self.params += [self.time, self.day_of_week, self.unknown, self.scheduleCRC]
class PlugwisePowerUsageResponse(PlugwiseResponse):
"""returns power usage as impulse counters for several different timeframes
"""
ID = b'0013'
def __init__(self, seqnr = None):
self.unique_network_id = String(None, length=16)
self.new_node_mac_id = String(None, length=16)
self.pan_id = String(None, length=4)
self.idx = Int(0, length=2)
self.params += [self.channel, self.source_mac_id, self.extended_pan_id, self.unique_network_id, self.new_node_mac_id, self.pan_id, self.idx]
def __len__(self):
arglen = sum(len(x) for x in self.params)
return 18 + arglen
def unserialize(self, response):
PlugwiseResponse.unserialize(self, response)
#Clear first two characters of mac ID, as they contain part of the short PAN-ID
self.new_node_mac_id.value = b'00'+self.new_node_mac_id.value[2:]
class PlugwiseQueryCirclePlusEndResponse(PlugwiseResponse):
ID = b'0003'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.status = Int(0, 4)
self.params += [self.status]
def __len__(self):
arglen = sum(len(x) for x in self.params)
return 18 + arglen
class PlugwiseConnectCirclePlusResponse(PlugwiseResponse):
ID = b'0005'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
]
class PlugwisePowerBufferResponseRaw(PlugwiseResponse):
"""returns information about historical power usage
each response contains 4 log buffers and each log buffer contains data for 1 hour
"""
ID = b'0049'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.raw = String(None, length=64)
self.logaddr = LogAddr(0, length=8)
self.params += [self.raw, self.logaddr
]
class PlugwiseInfoResponse(PlugwiseResponse):
ID = b'0024'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.datetime = DateTime()
self.last_logaddr = LogAddr(0, length=8)
self.relay_state = Int(0, length=2)
self.hz = Int(0, length=2)
self.hw_ver = String(None, length=12)
self.fw_ver = UnixTimestamp(0)
self.type = Int(0, length=2)
self.params += [
self.datetime,
self.last_logaddr, self.relay_state,
self.hz, self.hw_ver, self.fw_ver, self.type
]
class PlugwisePowerUsageResponse(PlugwiseResponse):
"""returns power usage as impulse counters for several different timeframes
"""
ID = b'0013'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.pulse_1s = SInt(0, 4)
self.pulse_8s = SInt(0, 4)
self.pulse_hour = Int(0, 8)
self.pulse_prod_hour = SInt(0, 8)
self.unknown2 = Int(0, 4)
self.params += [self.pulse_1s, self.pulse_8s, self.pulse_hour, self.pulse_prod_hour, self.unknown2]
class PlugwisePowerBufferResponse(PlugwiseResponse):
"""returns information about historical power usage
each response contains 4 log buffers and each log buffer contains data for 1 hour
"""
ID = b'0049'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.logdate1 = DateTime()
self.pulses1 = SInt(0, 8)
self.logdate2 = DateTime()
self.pulses2 = SInt(0, 8)
self.logdate3 = DateTime()
self.pulses3 = SInt(0, 8)
self.logdate4 = DateTime()
self.pulses4 = SInt(0, 8)
self.logaddr = LogAddr(0, length=8)