Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _crc_message(self, msg, term=True):
crc_msg = msg + opp.OppRs232Intf.calc_crc8_part_msg(msg, 0, len(msg))
if term:
crc_msg += '\xff'
return crc_msg
sys.exit()
# Use new update individual solenoid command
_, solenoid = config['number'].split('-')
opp_sol = self.solDict[config['number']]
opp_sol.driver_settings.update(opp_sol.merge_driver_settings(**config))
self.log.debug("Config driver %s, %s, %s", config['number'],
opp_sol.driver_settings['pulse_ms'], opp_sol.driver_settings['hold_power'])
pulse_len = int(opp_sol.driver_settings['pulse_ms'])
hold = int(opp_sol.driver_settings['hold_power'])
solIndex = int(solenoid) * OppRs232Intf.CFG_BYTES_PER_SOL
# If hold is 0, set the auto clear bit
if (hold == 0):
cmd = OppRs232Intf.CFG_SOL_AUTO_CLR
else:
cmd = chr(0)
opp_sol.solCard.currCfgLst[solIndex] = cmd
opp_sol.solCard.currCfgLst[solIndex + OppRs232Intf.INIT_KICK_OFFSET] = chr(pulse_len)
opp_sol.solCard.currCfgLst[solIndex + OppRs232Intf.DUTY_CYCLE_OFFSET] = chr(hold)
msg = []
msg.append(opp_sol.solCard.addr)
msg.append(OppRs232Intf.CFG_IND_SOL_CMD)
msg.append(chr(int(solenoid)))
msg.append(cmd)
msg.append(chr(pulse_len))
msg.append(chr(hold))
msg.append(OppRs232Intf.calc_crc8_whole_msg(msg))
msg.append(OppRs232Intf.EOM_CMD)
cmd = ''.join(msg)
def process_received_message(self, msg):
"""Sends an incoming message from the OPP hardware to the proper
method for servicing.
"""
if (len(msg) >= 1):
if ((ord(msg[0]) >= ord(OppRs232Intf.CARD_ID_GEN2_CARD)) and
(ord(msg[0]) < (ord(OppRs232Intf.CARD_ID_GEN2_CARD) + 0x20))):
if (len(msg) >= 2):
cmd = msg[1]
else:
cmd = OppRs232Intf.ILLEGAL_CMD
# Look for EOM or INV commands
elif (msg[0] == OppRs232Intf.INV_CMD) or (msg[0] == OppRs232Intf.EOM_CMD):
cmd = msg[0]
else:
cmd = OppRs232Intf.ILLEGAL_CMD
else:
# No messages received, fake an EOM
cmd = OppRs232Intf.EOM_CMD
# Can't use try since it swallows too many errors for now
if cmd in self.opp_commands:
self.opp_commands[cmd](msg)
else:
hex_string = "".join(" 0x%02x" % ord(b) for b in msg)
self.log.warning("Received unknown serial command?%s. (This is "
"very worrisome.)", "".join(" 0x%02x" % ord(b) for b in msg))
color: a 3-item list of integers representing R, G, and B values,
0-255 each.
"""
# todo this is crazy inefficient right now. todo change it so it can use
# hex strings as the color throughout
new_color = self.rgb_to_hex(color)
error = False
# Check if this color exists in the color table
if not new_color in self.neoCard.colorTableDict:
# Check if there are available spaces in the table
if (self.neoCard.numColorEntries < 32):
# Send the command to add color table entry
self.neoCard.colorTableDict[new_color] = \
chr(self.neoCard.numColorEntries + OppRs232Intf.NEO_CMD_ON)
msg = []
msg.append(self.neoCard.addr)
msg.append(OppRs232Intf.CHNG_NEO_COLOR_TBL)
msg.append(chr(self.neoCard.numColorEntries))
msg.append(chr(int(new_color[2:4],16)))
msg.append(chr(int(new_color[:2],16)))
msg.append(chr(int(new_color[-2:],16)))
msg.append(OppRs232Intf.calc_crc8_whole_msg(msg))
cmd = ''.join(msg)
self.log.debug("Add Neo color table entry: %s", "".join(" 0x%02x" % ord(b) for b in cmd))
self.neoCard.platform.opp_connection.send(cmd)
self.neoCard.numColorEntries += 1
else:
error = True
self.log.warn("Not enough Neo color table entries. "
"OPP only supports 32.")
method for servicing.
"""
if (len(msg) >= 1):
if ((ord(msg[0]) >= ord(OppRs232Intf.CARD_ID_GEN2_CARD)) and
(ord(msg[0]) < (ord(OppRs232Intf.CARD_ID_GEN2_CARD) + 0x20))):
if (len(msg) >= 2):
cmd = msg[1]
else:
cmd = OppRs232Intf.ILLEGAL_CMD
# Look for EOM or INV commands
elif (msg[0] == OppRs232Intf.INV_CMD) or (msg[0] == OppRs232Intf.EOM_CMD):
cmd = msg[0]
else:
cmd = OppRs232Intf.ILLEGAL_CMD
else:
# No messages received, fake an EOM
cmd = OppRs232Intf.EOM_CMD
# Can't use try since it swallows too many errors for now
if cmd in self.opp_commands:
self.opp_commands[cmd](msg)
else:
hex_string = "".join(" 0x%02x" % ord(b) for b in msg)
self.log.warning("Received unknown serial command?%s. (This is "
"very worrisome.)", "".join(" 0x%02x" % ord(b) for b in msg))
to update all the incandescents each loop.
Note: This could be made much more efficient by supporting a command
that simply sets the state of all 32 of the LEDs as either on or off.
"""
wholeMsg = []
for incand in self.opp_incands:
# Check if any changes have been made
if ((incand.oldState ^ incand.newState) != 0):
# Update card
incand.oldState = incand.newState
msg = []
msg.append(incand.addr)
msg.append(OppRs232Intf.INCAND_CMD)
msg.append(OppRs232Intf.INCAND_SET_ON_OFF)
msg.append(chr((incand.newState >> 24) & 0xff))
msg.append(chr((incand.newState >> 16) & 0xff))
msg.append(chr((incand.newState >> 8) & 0xff))
msg.append(chr(incand.newState & 0xff))
msg.append(OppRs232Intf.calc_crc8_whole_msg(msg))
wholeMsg.extend(msg)
if (len(wholeMsg) != 0):
wholeMsg.append(OppRs232Intf.EOM_CMD)
sendCmd = ''.join(wholeMsg)
self.opp_connection.send(sendCmd)
self.log.debug("Update incand cmd:%s", "".join(" 0x%02x" % ord(b) for b in sendCmd))
def __init__(self, addr, mask, solDict, platform):
self.log = logging.getLogger('OPPSolenoid')
self.addr = addr
self.mask = mask
self.platform = platform
self.state = 0
self.procCtl = 0
self.currCfgLst = ['\x00' for _ in range(OppRs232Intf.NUM_G2_SOL_PER_BRD *
OppRs232Intf.CFG_BYTES_PER_SOL)]
self.log.debug("Creating OPP Solenoid at hardware address: 0x%02x",
ord(addr))
card = str(ord(addr) - ord(OppRs232Intf.CARD_ID_GEN2_CARD))
for index in range(0, 16):
if (((1 << index) & mask) != 0):
number = card + '-' + str(index)
opp_sol = OPPSolenoid(self, number)
opp_sol.driver_settings = self.create_driver_settings(platform.machine)
solDict[card + '-' + str(index)] = opp_sol
Note: This could be made much more efficient by supporting a command
that simply sets the state of all 32 of the LEDs as either on or off.
"""
wholeMsg = []
for incand in self.opp_incands:
# Check if any changes have been made
if ((incand.oldState ^ incand.newState) != 0):
# Update card
incand.oldState = incand.newState
msg = []
msg.append(incand.addr)
msg.append(OppRs232Intf.INCAND_CMD)
msg.append(OppRs232Intf.INCAND_SET_ON_OFF)
msg.append(chr((incand.newState >> 24) & 0xff))
msg.append(chr((incand.newState >> 16) & 0xff))
msg.append(chr((incand.newState >> 8) & 0xff))
msg.append(chr(incand.newState & 0xff))
msg.append(OppRs232Intf.calc_crc8_whole_msg(msg))
wholeMsg.extend(msg)
if (len(wholeMsg) != 0):
wholeMsg.append(OppRs232Intf.EOM_CMD)
sendCmd = ''.join(wholeMsg)
self.opp_connection.send(sendCmd)
self.log.debug("Update incand cmd:%s", "".join(" 0x%02x" % ord(b) for b in sendCmd))
opp_sol.driver_settings.update(opp_sol.merge_driver_settings(**config))
self.log.debug("Config driver %s, %s, %s", config['number'],
opp_sol.driver_settings['pulse_ms'], opp_sol.driver_settings['hold_power'])
pulse_len = int(opp_sol.driver_settings['pulse_ms'])
hold = int(opp_sol.driver_settings['hold_power'])
solIndex = int(solenoid) * OppRs232Intf.CFG_BYTES_PER_SOL
# If hold is 0, set the auto clear bit
if (hold == 0):
cmd = OppRs232Intf.CFG_SOL_AUTO_CLR
else:
cmd = chr(0)
opp_sol.solCard.currCfgLst[solIndex] = cmd
opp_sol.solCard.currCfgLst[solIndex + OppRs232Intf.INIT_KICK_OFFSET] = chr(pulse_len)
opp_sol.solCard.currCfgLst[solIndex + OppRs232Intf.DUTY_CYCLE_OFFSET] = chr(hold)
msg = []
msg.append(opp_sol.solCard.addr)
msg.append(OppRs232Intf.CFG_IND_SOL_CMD)
msg.append(chr(int(solenoid)))
msg.append(cmd)
msg.append(chr(pulse_len))
msg.append(chr(hold))
msg.append(OppRs232Intf.calc_crc8_whole_msg(msg))
msg.append(OppRs232Intf.EOM_CMD)
cmd = ''.join(msg)
self.log.debug("Writing individual config: %s", "".join(" 0x%02x" % ord(b) for b in cmd))
self.opp_connection.send(cmd)
return (opp_sol, config['number'])
hex_string = "".join(" 0x%02x" % ord(b) for b in msg)
self.log.warning("Msg contains bad CRC:%s.", hex_string)
end = True
else:
hasNeo = False
wingIndex = 0
solMask = 0
inpMask = 0
incandMask = 0
while (wingIndex < OppRs232Intf.NUM_G2_WING_PER_BRD):
if (msg[currIndex + 2 + wingIndex] == OppRs232Intf.WING_SOL):
solMask |= (0x0f << (4 * wingIndex))
inpMask |= (0x0f << (8 * wingIndex))
elif (msg[currIndex + 2 + wingIndex] == OppRs232Intf.WING_INP):
inpMask |= (0xff << (8 * wingIndex))
elif (msg[currIndex + 2 + wingIndex] == OppRs232Intf.WING_INCAND):
incandMask |= (0xff << (8 * wingIndex))
elif (msg[currIndex + 2 + wingIndex] == OppRs232Intf.WING_NEO):
hasNeo = True
wingIndex += 1
if (incandMask != 0):
self.opp_incands.append(OPPIncandCard(msg[currIndex], incandMask, self.incandDict))
if (solMask != 0):
self.opp_solenoid.append(OPPSolenoidCard(msg[currIndex], solMask, self.solDict, self))
if (inpMask != 0):
# Create the input object, and add to the command to read all inputs
self.opp_inputs.append(OPPInput(msg[currIndex], inpMask, self.inpDict,
self.inpAddrDict, self.machine))
# Add command to read all inputs to read input message
inpMsg = []
inpMsg.append(msg[currIndex])