Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
print_hid_enumerate()
sys.exit()
self.serial_number = serial_number
device = hidapi.hid_open_path(path)
crypto = gevent.spawn(self.setup_crypto, self.serial_number)
gevent.sleep(DEVICE_POLL_INTERVAL)
console_updater = gevent.spawn(self.update_console)
while self.running:
try:
if _os_decryption:
data = hidraw.read(32)
if data != "":
self.packets.put_nowait(EmotivPacket(data, self.sensors, self.old_model))
else:
# Doesn't seem to matter how big we make the buffer 32 returned every time, 33 for other platforms
data = hidapi.hid_read(device, 34)
if len(data) == 32:
# Most of the time the 0 is truncated? That's ok we'll add it... Could probably not do this...
data.insert(0, 0)
if data != "":
if _os_decryption:
self.packets.put_nowait(EmotivPacket(data, self.sensors, self.old_model))
else:
# Queue it!
if self.write and self.write_raw:
self.write_data(data)
tasks.put_nowait(''.join(map(chr, data[1:])))
self.packets_received += 1
gevent.sleep(DEVICE_POLL_INTERVAL)
except KeyboardInterrupt:
self.running = False
except Exception, ex:
def cor(self) -> bool:
"""
[RX] Detects if there's a signal from the Radio.
Blocks until a signal is received from the Radio.
"""
cor_status: bool = False
hid_read = hidapi.hid_read(self.hid_device, roip.READ_SIZE)
self._logger.debug('read="%s"', hid_read)
if roip.COR_START in hid_read:
cor_status = True
elif roip.COR_STOP in hid_read:
cor_status = False
self._logger.debug('cor_status="%s"', cor_status)
return cor_status
def readFirmwareVersion(self):
self.sn = self.sn + 1
if self.sn > 255:
self.sn = 1
data = struct.pack('BBBBBBBBBB', 0xAA, HASSEB_READ_FIRMWARE_VERSION,
self.sn, 0, 0, 0, 0, 0, 0, 0)
hidapi.hid_write(self.device, data)
data = hidapi.hid_read(self.device, 10)
for i in range(0,100):
if len(data)==10:
if data[1] != HASSEB_READ_FIRMWARE_VERSION:
data = hidapi.hid_read(self.device, 10)
else:
return f"{data[3]}.{data[4]}"
else:
data = hidapi.hid_read(self.device, 10)
return f"VERSION_ERROR"
def receive(self):
data = hidapi.hid_read(self.device, 10)
frame = self.extract(data)
if isinstance(frame, HassebDALIUSBNoDataAvailable):
return
elif isinstance(frame, BackwardFrame) or isinstance(frame, HassebDALIUSBNoAnswer):
if self._pending and isinstance(frame, BackwardFrame):
self._response_message = data
self._pending = None
elif self._pending and isinstance(frame, HassebDALIUSBNoAnswer):
self._response_message = None
self._pending = None
return data
count += 1
if count == 100:
led_state = bytearray(("82" + ''.join(format(random.randint(0,255), '02x') for _ in range(31))).decode("hex"))
hidapi.hid_write(device, led_state)
# 16 rgb buttons
pad_state = bytearray(("80" + ''.join(format(random.randint(0,255), '02x') for _ in range(16 * 3))).decode("hex"))
hidapi.hid_write(device, pad_state)
# 8 group rgb_1 + 8 group rgb_2 + 8 transport buttons
group_other_btn_state = bytearray(("81" + ''.join(format(random.randint(0,255), '02x') for _ in range(((8 + 8) * 3) + 8))).decode("hex"))
hidapi.hid_write(device, group_other_btn_state)
count = 0
result = hidapi.hid_read(device, 256)
state = binascii.hexlify(result)
if state != quiet_state:
print "#%d: %s" % (len(result), state)
hidapi.hid_close(device)