Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def unserialize(self, response):
# FIXME: avoid magic numbers
header5 = False
header_start = response.find(PlugwiseMessage.PACKET_HEADER5)
if header_start == 0:
#A response from a circle seems to be preceeded by the x/83 in the header
#Just strip this character to not further complcate the code
response = response[1:]
header5 = True
header, self.function_code, self.command_counter = struct.unpack("4s4s4s", response[:12])
crc, footer = struct.unpack("4s2s", response[-6:])
raw_msg_len = len(response)
#check for protocol errors
protocol_error = ''
if footer != self.PACKET_FOOTER:
protocol_error = "broken footer!"
else:
ID = b'001D'
def __init__(self, seqnr = None):
PlugwiseResponse.__init__(self, seqnr)
self.node_mac_id = String(None, length=16)
self.status = Int(0, 2)
self.params += [self.node_mac_id, self.status]
class PlugwiseAckAssociationResponse(PlugwiseResponse):
ID = b'0061'
def __init__(self, seqnr = None):
#sequence number is always FFFD
PlugwiseResponse.__init__(self, 0xFFFD)
class PlugwiseRequest(PlugwiseMessage):
def __init__(self, mac):
PlugwiseMessage.__init__(self)
self.args = []
self.mac = sc(mac)
class PlugwiseStatusRequest(PlugwiseRequest):
"""Get Stick Status"""
ID = b'000A'
def __init__(self):
"""message for that initializes the Stick"""
# init doesn't send MAC address
PlugwiseRequest.__init__(self, '')
class PlugwisePowerUsageRequest(PlugwiseRequest):
ID = b'0012'
def serialize(self):
"""return message in a serialized format that can be sent out
on wire
"""
args = b''.join(a.serialize() for a in self.args)
msg = self.ID+self.mac+sc(args)
checksum = self.calculate_checksum(msg)
full_msg = self.PACKET_HEADER+msg+checksum+self.PACKET_FOOTER
logcomm("SEND %4d ---> %4s %16s %s %4s <---" % (len(full_msg), self.ID, self.mac, sc(args), checksum))
return full_msg
def calculate_checksum(self, s):
return sc("%04X" % crc_fun(s))
#class PlugwiseBaseResponse(PlugwiseMessage):
class PlugwiseResponse(PlugwiseMessage):
ID = b'FFFF'
def __init__(self, seqnr = None):
PlugwiseMessage.__init__(self)
self.params = []
self.mac = None
self.function_code = None
self.command_counter = None
self.expected_command_counter = seqnr
def unserialize(self, response):
# FIXME: avoid magic numbers
header5 = False
header_start = response.find(PlugwiseMessage.PACKET_HEADER5)