Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
minimum_off = self.get_minimum_off_time(driver)
if driver.hw_driver.use_switch:
cmd += ord(OppRs232Intf.CFG_SOL_USE_SWITCH)
_, solenoid = driver.config['number'].split('-')
pulse_len = self._get_pulse_ms_value(driver)
msg = bytearray()
msg.append(driver.hw_driver.solCard.addr)
msg.extend(OppRs232Intf.CFG_IND_SOL_CMD)
msg.append(int(solenoid))
msg.append(cmd)
msg.append(pulse_len)
msg.append(hold + (minimum_off << 4))
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
msg.extend(OppRs232Intf.EOM_CMD)
final_cmd = bytes(msg)
self.log.debug("Writing individual config: %s", "".join(" 0x%02x" % b for b in final_cmd))
self.opp_connection.send(final_cmd)
def send_vers_cmd(self):
"""Send get firmware version message."""
whole_msg = bytearray()
for card_addr in self.platform.gen2_addr_arr[self.chain_serial]:
msg = bytearray()
msg.append(card_addr)
msg.extend(OppRs232Intf.GET_VERS_CMD)
msg.append(0)
msg.append(0)
msg.append(0)
msg.append(0)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
whole_msg.extend(msg)
whole_msg.extend(OppRs232Intf.EOM_CMD)
cmd = bytes(whole_msg)
self.log.debug("Sending get version command: %s", "".join(" 0x%02x" % b for b in cmd))
self.send(cmd)
cmd = bytes(msg)
self.log.debug("Add Neo color table entry: %s", "".join(" 0x%02x" % b for b in cmd))
self.neo_card.platform.send_to_processor(self.neo_card.chain_serial, cmd)
self.neo_card.num_color_entries += 1
else:
error = True
self.log.warning("Not enough Neo color table entries. OPP only supports 32.")
# Send msg to set the neopixel
if not error:
msg = bytearray()
msg.append(self.neo_card.addr)
msg.extend(OppRs232Intf.SET_IND_NEO_CMD)
msg.append(ord(self.index_char))
msg.append(self.neo_card.color_table_dict[new_color])
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
cmd = bytes(msg)
self.log.debug("Set Neopixel color: %s", "".join(" 0x%02x" % b for b in cmd))
self.neo_card.platform.send_to_processor(self.neo_card.chain_serial, cmd)
if sol_mask != 0:
self.opp_solenoid.append(OPPSolenoidCard(msg[curr_index], sol_mask, self.solDict, self))
if inp_mask != 0:
# Create the input object, and add to the command to read all inputs
self.opp_inputs.append(OPPInputCard(msg[curr_index], inp_mask, self.inpDict,
self.inpAddrDict))
# Add command to read all inputs to read input message
inp_msg = bytearray()
inp_msg.append(msg[curr_index])
inp_msg.extend(OppRs232Intf.READ_GEN2_INP_CMD)
inp_msg.append(0)
inp_msg.append(0)
inp_msg.append(0)
inp_msg.append(0)
inp_msg.extend(OppRs232Intf.calc_crc8_whole_msg(inp_msg))
whole_msg.extend(inp_msg)
if has_neo:
self.opp_neopixels.append(OPPNeopixelCard(msg[curr_index], self.neoCardDict, self))
if msg[curr_index + 7] == ord(OppRs232Intf.EOM_CMD):
break
elif msg[curr_index + 8] == ord(OppRs232Intf.GET_GEN2_CFG):
curr_index += 7
else:
self.log.warning("Malformed GET_GEN2_CFG response:%s.",
"".join(" 0x%02x" % b for b in msg))
break
# TODO: This means synchronization is lost. Send EOM characters
# until they come back
def send_vers_cmd(self):
# Now send get firmware version message
whole_msg = bytearray()
for card_addr in self.platform.gen2AddrArr:
# Turn on the bulbs that are non-zero
msg = bytearray()
msg.append(card_addr)
msg.extend(OppRs232Intf.GET_GET_VERS_CMD)
msg.append(0)
msg.append(0)
msg.append(0)
msg.append(0)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
whole_msg.extend(msg)
whole_msg.extend(OppRs232Intf.EOM_CMD)
cmd = bytes(whole_msg)
self.log.debug("Sending get version command: %s", "".join(" 0x%02x" % b for b in cmd))
self.serial_connection.write(cmd)
whole_msg = bytearray()
for incand in self.opp_incands:
# Check if any changes have been made
if (incand.oldState ^ incand.newState) != 0:
# Update card
incand.oldState = incand.newState
msg = bytearray()
msg.append(incand.addr)
msg.extend(OppRs232Intf.INCAND_CMD)
msg.extend(OppRs232Intf.INCAND_SET_ON_OFF)
msg.append((incand.newState >> 24) & 0xff)
msg.append((incand.newState >> 16) & 0xff)
msg.append((incand.newState >> 8) & 0xff)
msg.append(incand.newState & 0xff)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
whole_msg.extend(msg)
if len(whole_msg) != 0:
whole_msg.extend(OppRs232Intf.EOM_CMD)
send_cmd = bytes(whole_msg)
self.opp_connection.send(send_cmd)
self.log.debug("Update incand cmd:%s", "".join(" 0x%02x" % b for b in send_cmd))
error = False
# Check if this color exists in the color table
if new_color not in self.neo_card.color_table_dict:
# Check if there are available spaces in the table
if self.neo_card.num_color_entries < 32:
# Send the command to add color table entry
self.neo_card.color_table_dict[new_color] = self.neo_card.num_color_entries + OppRs232Intf.NEO_CMD_ON
msg = bytearray()
msg.append(self.neo_card.addr)
msg.extend(OppRs232Intf.CHNG_NEO_COLOR_TBL)
msg.append(self.neo_card.num_color_entries)
msg.append(int(new_color[2:4], 16))
msg.append(int(new_color[:2], 16))
msg.append(int(new_color[-2:], 16))
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
cmd = bytes(msg)
self.log.debug("Add Neo color table entry: %s", "".join(" 0x%02x" % b for b in cmd))
self.neo_card.platform.send_to_processor(self.neo_card.chain_serial, cmd)
self.neo_card.num_color_entries += 1
else:
error = True
self.log.warning("Not enough Neo color table entries. OPP only supports 32.")
# Send msg to set the neopixel
if not error:
msg = bytearray()
msg.append(self.neo_card.addr)
msg.extend(OppRs232Intf.SET_IND_NEO_CMD)
msg.append(ord(self.index_char))
msg.append(self.neo_card.color_table_dict[new_color])
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
def send_get_gen2_cfg_cmd(self):
"""Send get gen2 configuration message to find populated wing boards."""
whole_msg = bytearray()
for card_addr in self.platform.gen2_addr_arr[self.chain_serial]:
msg = bytearray()
msg.append(card_addr)
msg.extend(OppRs232Intf.GET_GEN2_CFG)
msg.append(0)
msg.append(0)
msg.append(0)
msg.append(0)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
whole_msg.extend(msg)
whole_msg.extend(OppRs232Intf.EOM_CMD)
cmd = bytes(whole_msg)
self.log.debug("Sending get Gen2 Cfg command: %s", "".join(" 0x%02x" % b for b in cmd))
self.send(cmd)
# in case config happens after set switch command
cmd += ord(OppRs232Intf.CFG_SOL_USE_SWITCH)
if self.switch_rule.can_cancel:
cmd += ord(OppRs232Intf.CFG_SOL_CAN_CANCEL)
pulse_len = pulse_settings.duration
msg = bytearray()
msg.append(self.sol_card.addr)
msg.extend(OppRs232Intf.CFG_IND_SOL_CMD)
msg.append(int(solenoid))
msg.append(cmd)
msg.append(pulse_len)
msg.append(hold + (minimum_off << 4))
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
msg.extend(OppRs232Intf.EOM_CMD)
final_cmd = bytes(msg)
self.log.debug("Writing individual config: %s", "".join(" 0x%02x" % b for b in final_cmd))
self.sol_card.platform.send_to_processor(self.sol_card.chain_serial, final_cmd)