Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _open(args):
device = args.device
if args.hidpp and not device:
for d in _hid.enumerate(vendor_id=0x046d):
if d.driver == 'logitech-djreceiver':
device = d.path
break
if not device:
sys.exit("!! No HID++ receiver found.")
if not device:
sys.exit("!! Device path required.")
print (".. Opening device", device)
handle = _hid.open_path(device)
if not handle:
sys.exit("!! Failed to open %s, aborting." % device)
print (".. Opened handle %r, vendor %r product %r serial %r." % (
handle,
_hid.get_manufacturer(handle),
else:
time.sleep(0.00001)
self.lock.acquire()
if self._stop_signal:
print("Reader stopping...")
self.running = False
if self.file is not None:
self.file.close()
if type(source) != int and type(source) != list:
source.close()
if self.hid is not None:
if type(self.hid) != int:
self.hid.close()
if system_platform != "Windows":
try:
hidapi.hid_close(source)
except Exception:
pass
try:
hidapi.hid_exit()
except Exception:
pass
print("Reader stopped...")
self.stopped = True
return
# Queue it!
if self.write and self.write_raw:
self.write_data(data)
tasks.put_nowait(''.join(map(chr, data[1:])))
self.packets_received += 1
gevent.sleep(DEVICE_POLL_INTERVAL)
except KeyboardInterrupt:
self.running = False
except Exception, ex:
print("Setup emotiv.py(line=710): %s" % ex.message)
self.running = False
gevent.sleep(DEVICE_POLL_INTERVAL)
if _os_decryption:
hidraw.close()
else:
hidapi.hid_close(device)
gevent.kill(crypto, KeyboardInterrupt)
gevent.kill(console_updater, KeyboardInterrupt)
print_hid_enumerate()
sys.exit()
self.serial_number = serial_number
device = hidapi.hid_open_path(path)
crypto = gevent.spawn(self.setup_crypto, self.serial_number)
gevent.sleep(DEVICE_POLL_INTERVAL)
console_updater = gevent.spawn(self.update_console)
while self.running:
try:
if _os_decryption:
data = hidraw.read(32)
if data != "":
self.packets.put_nowait(EmotivPacket(data, self.sensors, self.old_model))
else:
# Doesn't seem to matter how big we make the buffer 32 returned every time, 33 for other platforms
data = hidapi.hid_read(device, 34)
if len(data) == 32:
# Most of the time the 0 is truncated? That's ok we'll add it... Could probably not do this...
data.insert(0, 0)
if data != "":
if _os_decryption:
self.packets.put_nowait(EmotivPacket(data, self.sensors, self.old_model))
else:
# Queue it!
if self.write and self.write_raw:
self.write_data(data)
tasks.put_nowait(''.join(map(chr, data[1:])))
self.packets_received += 1
gevent.sleep(DEVICE_POLL_INTERVAL)
except KeyboardInterrupt:
self.running = False
except Exception, ex:
#!/usr/bin/env python
import hidapi
import binascii
import time
hidapi.hid_init()
print 'Loaded hidapi library from: {:s}\n'.format(hidapi.hid_lib_path())
devices = hidapi.hid_enumerate(0x17cc, 0x1140)
if len(devices) == 0:
print "No maschine attached"
exit(1)
device = hidapi.hid_open(0x17cc, 0x1140)
quiet_state = "20000000100020003000400050006000700080009000a000b000c000d000e000f0000000100020003000400050006000700080009000a000b000c000d000e000f0"
to_bytearray = lambda display: [bytearray(line.replace(" ", "").decode("hex")) for line in display]
write_display = lambda display: [hidapi.hid_write(device, line) for line in to_bytearray(display)]
def clear_display(no):
for i in range(0, 8):
hidapi.hid_write(device, (bytearray((("e00000%02x00200008" % (8 * i) + ((265 - 8) * "00")).replace("e0", "e%d" %no)).decode("hex"))))
led_state = bytearray("820a0a0a 0a0a0a0a 0a0a0a0a 0a0a0a0a 0a0a0a0a 0a0a0a0a 0a3f3f3f 3f3f3f3f".replace(" ", "").decode("hex"))
def __init__(self, file_name=None, mode="hid", hid=None, file=None, new_format=False, **kwargs):
self.mode = mode
self.file = file
self.file_name = file_name
self.hid = hid
self.new_format = new_format
self.platform = system_platform
self.serial_number = None
self.lock = Lock()
if self.platform != "Windows":
hidapi.hid_init()
self.setup_platform = {
'Windows': self.setup_windows,
'Darwin': self.setup_not_windows,
'Linux': self.setup_not_windows,
'Reader': self.setup_reader,
}
if self.mode == "csv":
if file_name is None:
raise ValueError("CSV file name must be specified when initializing an EmotivReader class using mode "
"'csv'.")
if sys.version_info >= (3, 0):
self.file = open(file_name, 'r')
else:
self.file = open(file_name, 'rb')
contents = self.file.read().split('\n')
#!/usr/bin/env python
import hidapi
import binascii
import time
hidapi.hid_init()
print 'Loaded hidapi library from: {:s}\n'.format(hidapi.hid_lib_path())
devices = hidapi.hid_enumerate(0x17cc, 0x1140)
if len(devices) == 0:
print "No maschine attached"
exit(1)
device = hidapi.hid_open(0x17cc, 0x1140)
quiet_state = "20000000100020003000400050006000700080009000a000b000c000d000e000f0000000100020003000400050006000700080009000a000b000c000d000e000f0"
to_bytearray = lambda display: [bytearray(line.replace(" ", "").decode("hex")) for line in display]
write_display = lambda display: [hidapi.hid_write(device, line) for line in to_bytearray(display)]
def clear_display(no):
Modified version of: https://github.com/openyou/emokit/blob/13512c5e078d0ff321709a31f19377fc9b7e18a1/python/emokit/emotiv.py
Use this script, as long as the current `master` of the emokit API is unstable.
'''
import os
import platform
import sys
system_platform = platform.system()
if system_platform == "Windows":
import pywinusb.hid as hid
else:
import hidapi #@UnresolvedImport
hidapi.hid_init()
import gevent
from Crypto.Cipher import AES
from Crypto import Random
from gevent.queue import Queue
from datetime import datetime
import csv
# How long to gevent-sleep if there is no data on the EEG.
# To be precise, this is not the frequency to poll on the input device
# (which happens with a blocking read), but how often the gevent thread
# polls the real threading queue that reads the data in a separate thread
# to not block gevent with the file read().
# This is the main latency control.
# Setting it to 1ms takes about 10% CPU on a Core i5 mobile.
# You can set this lower to reduce idle CPU usage; it has no effect
def open(self) -> None:
"""Opens the connection to the RoIP device."""
hidapi.hid_init()
self.hid_device = hidapi.hid_open(self.device[0], self.device[1])
self._logger.info('Opened hid_device="%s"', self.hid_device)
def __init__(self, file_name=None, mode="hid", hid=None, file=None, **kwargs):
self.mode = mode
self.file = file
self.file_name = file_name
self.hid = hid
self.platform = system_platform
self.serial_number = None
self.lock = Lock()
if self.platform != "Windows":
hidapi.hid_init()
self.setup_platform = {
'Windows': self.setup_windows,
'Darwin': self.setup_not_windows,
'Linux': self.setup_not_windows,
'Reader': self.setup_reader,
}
if self.mode == "csv":
if file_name is None:
raise ValueError("CSV file name must be specified when initializing an EmotivReader class using mode "
"'csv'.")
if sys.version_info >= (3, 0):
self.file = open(file_name, 'r')
else:
self.file = open(file_name, 'rb')
self.reader = csv.reader(self.file, quoting=csv.QUOTE_ALL)