Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def reconfigure_driver(self, pulse_settings: PulseSettings, hold_settings: Optional[HoldSettings],
recycle: bool = False):
"""Reconfigure a driver."""
new_config_state = (pulse_settings, hold_settings, recycle, bool(self.switch_rule))
# if config would not change do nothing
if new_config_state == self._config_state:
return
self._config_state = new_config_state
# If hold is 0, set the auto clear bit
if not hold_settings or not hold_settings.power:
cmd = ord(OppRs232Intf.CFG_SOL_AUTO_CLR)
hold = 0
else:
cmd = 0
hold = int(hold_settings.power * 16)
if hold >= 16:
if self.sol_card.platform.min_version[self.sol_card.chain_serial] >= 0x00020000:
# set flag for full power
cmd += ord(OppRs232Intf.CFG_SOL_ON_OFF)
hold = 0
else:
hold = 15
minimum_off = self.get_minimum_off_time(recycle)
_, _, solenoid = self.number.split('-')
# Before version 0.2.0.0 set solenoid input wasn't available.
0-255 each.
"""
new_color = "{0}{1}{2}".format(hex(int(color[0]))[2:].zfill(2),
hex(int(color[1]))[2:].zfill(2),
hex(int(color[2]))[2:].zfill(2))
error = False
# Check if this color exists in the color table
if new_color not in self.neo_card.color_table_dict:
# Check if there are available spaces in the table
if self.neo_card.num_color_entries < 32:
# Send the command to add color table entry
self.neo_card.color_table_dict[new_color] = self.neo_card.num_color_entries + OppRs232Intf.NEO_CMD_ON
msg = bytearray()
msg.append(self.neo_card.addr)
msg.extend(OppRs232Intf.CHNG_NEO_COLOR_TBL)
msg.append(self.neo_card.num_color_entries)
msg.append(int(new_color[2:4], 16))
msg.append(int(new_color[:2], 16))
msg.append(int(new_color[-2:], 16))
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
cmd = bytes(msg)
self.log.debug("Add Neo color table entry: %s", "".join(" 0x%02x" % b for b in cmd))
self.neo_card.platform.send_to_processor(self.neo_card.chain_serial, cmd)
self.neo_card.num_color_entries += 1
else:
error = True
self.log.warning("Not enough Neo color table entries. OPP only supports 32.")
# Send msg to set the neopixel
if not error:
msg = bytearray()
def send_get_gen2_cfg_cmd(self):
"""Send get gen2 configuration message to find populated wing boards."""
whole_msg = bytearray()
for card_addr in self.platform.gen2_addr_arr[self.chain_serial]:
msg = bytearray()
msg.append(card_addr)
msg.extend(OppRs232Intf.GET_GEN2_CFG)
msg.append(0)
msg.append(0)
msg.append(0)
msg.append(0)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
whole_msg.extend(msg)
whole_msg.extend(OppRs232Intf.EOM_CMD)
cmd = bytes(whole_msg)
self.log.debug("Sending get Gen2 Cfg command: %s", "".join(" 0x%02x" % b for b in cmd))
self.send(cmd)
msg.append(int(new_color[:2], 16))
msg.append(int(new_color[-2:], 16))
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
cmd = bytes(msg)
self.log.debug("Add Neo color table entry: %s", "".join(" 0x%02x" % b for b in cmd))
self.neo_card.platform.send_to_processor(self.neo_card.chain_serial, cmd)
self.neo_card.num_color_entries += 1
else:
error = True
self.log.warning("Not enough Neo color table entries. OPP only supports 32.")
# Send msg to set the neopixel
if not error:
msg = bytearray()
msg.append(self.neo_card.addr)
msg.extend(OppRs232Intf.SET_IND_NEO_CMD)
msg.append(ord(self.index_char))
msg.append(self.neo_card.color_table_dict[new_color])
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
cmd = bytes(msg)
self.log.debug("Set Neopixel color: %s", "".join(" 0x%02x" % b for b in cmd))
self.neo_card.platform.send_to_processor(self.neo_card.chain_serial, cmd)
while True:
if (count % 10) == 0:
self.log.debug("Sending EOM command to port '%s'",
self.serial_connection.name)
count += 1
self.serial_connection.write(OppRs232Intf.EOM_CMD)
time.sleep(.01)
resp = self.serial_connection.read(30)
if resp.startswith(OppRs232Intf.EOM_CMD):
break
if count == 100:
raise AssertionError('No response from OPP hardware: {}'.format(self.serial_connection.name))
# Send inventory command to figure out number of cards
msg = bytearray()
msg.extend(OppRs232Intf.INV_CMD)
msg.extend(OppRs232Intf.EOM_CMD)
cmd = bytes(msg)
self.log.debug("Sending inventory command: %s", "".join(" 0x%02x" % b for b in cmd))
self.serial_connection.write(cmd)
time.sleep(.1)
resp = self.serial_connection.read(200)
# resp will contain the inventory response.
self.platform.process_received_message(resp)
# Now send get gen2 configuration message to find populated wing boards
self.send_get_gen2_cfg_cmd()
time.sleep(.1)
def send_get_gen2_cfg_cmd(self):
# Now send get gen2 configuration message to find populated wing boards
whole_msg = bytearray()
for cardAddr in self.platform.gen2AddrArr:
# Turn on the bulbs that are non-zero
msg = bytearray()
msg.append(cardAddr)
msg.extend(OppRs232Intf.GET_GEN2_CFG)
msg.append(0)
msg.append(0)
msg.append(0)
msg.append(0)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
whole_msg.extend(msg)
whole_msg.extend(OppRs232Intf.EOM_CMD)
cmd = bytes(whole_msg)
self.log.debug("Sending get Gen2 Cfg command: %s", "".join(" 0x%02x" % b for b in cmd))
self.serial_connection.write(cmd)
def send_get_gen2_cfg_cmd(self):
"""Send get gen2 configuration message to find populated wing boards."""
whole_msg = bytearray()
for card_addr in self.platform.gen2_addr_arr[self.chain_serial]:
msg = bytearray()
msg.append(card_addr)
msg.extend(OppRs232Intf.GET_GEN2_CFG)
msg.append(0)
msg.append(0)
msg.append(0)
msg.append(0)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
whole_msg.extend(msg)
whole_msg.extend(OppRs232Intf.EOM_CMD)
cmd = bytes(whole_msg)
self.log.debug("Sending get Gen2 Cfg command: %s", "".join(" 0x%02x" % b for b in cmd))
self.send(cmd)
def _kick_coil(self, sol_int, on):
mask = 1 << sol_int
msg = bytearray()
msg.append(self.sol_card.addr)
msg.extend(OppRs232Intf.KICK_SOL_CMD)
if on:
msg.append((mask >> 8) & 0xff)
msg.append(mask & 0xff)
else:
msg.append(0)
msg.append(0)
msg.append((mask >> 8) & 0xff)
msg.append(mask & 0xff)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
cmd = bytes(msg)
self.log.debug("Triggering solenoid driver: %s", "".join(" 0x%02x" % b for b in cmd))
self.sol_card.platform.send_to_processor(self.sol_card.chain_serial, cmd)
Note: This could be made much more efficient by supporting a command
that simply sets the state of all 32 of the LEDs as either on or off.
"""
whole_msg = bytearray()
for incand in self.opp_incands:
# Check if any changes have been made
if (incand.oldState ^ incand.newState) != 0:
# Update card
incand.oldState = incand.newState
msg = bytearray()
msg.append(incand.addr)
msg.extend(OppRs232Intf.INCAND_CMD)
msg.extend(OppRs232Intf.INCAND_SET_ON_OFF)
msg.append((incand.newState >> 24) & 0xff)
msg.append((incand.newState >> 16) & 0xff)
msg.append((incand.newState >> 8) & 0xff)
msg.append(incand.newState & 0xff)
msg.extend(OppRs232Intf.calc_crc8_whole_msg(msg))
whole_msg.extend(msg)
if len(whole_msg) != 0:
whole_msg.extend(OppRs232Intf.EOM_CMD)
send_cmd = bytes(whole_msg)
self.opp_connection.send(send_cmd)
self.log.debug("Update incand cmd:%s", "".join(" 0x%02x" % b for b in send_cmd))