Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __send_inquire_node_id(self):
"""
:return:
Current node id
:rtype: int
"""
message = bytearray(8)
message[0] = CS_INQUIRE_NODE_ID
response = self.__send_command(message)
cs, current_node_id = struct.unpack_from("
def __send_configure(self, req_cs, value1=0, value2=0):
"""Send a message to set a key with values"""
message = bytearray(8)
message[0] = req_cs
message[1] = value1
message[2] = value2
response = self.__send_command(message)
res_cs, error_code = struct.unpack_from("
def __send_configure(self, req_cs, value1=0, value2=0):
"""Send a message to set a key with values"""
message = bytearray(8)
message[0] = req_cs
message[1] = value1
message[2] = value2
response = self.__send_command(message)
res_cs, error_code = struct.unpack_from("
def __send_inquire_lss_address(self, req_cs):
"""
:return:
part of address. e.g., vendor ID or product code, ..
:rtype: int
"""
message = bytearray(8)
message[0] = req_cs
response = self.__send_command(message)
res_cs, part_of_address = struct.unpack_from("
if not self.responses.empty():
logger.info("There were unexpected messages in the queue")
self.responses = queue.Queue()
self.network.send_message(self.LSS_TX_COBID, message)
if not bool(message[0] in ListMessageNeedResponse):
return response
# Wait for the slave to respond
# TODO check if the response is LSS response message
try:
response = self.responses.get(
block=True, timeout=self.RESPONSE_TIMEOUT)
except queue.Empty:
raise LssError("No LSS response received")
return response