Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def InternalGenerateReply(self): # pylint: disable=invalid-name
if self.init_packet.cmd == hidtransport.UsbHidTransport.U2FHID_INIT:
nonce = self.init_packet.payload[0:8]
self.reply = nonce + self.cid_to_allocate + bytearray(
b'\x01\x00\x00\x00\x00')
elif self.init_packet.cmd == hidtransport.UsbHidTransport.U2FHID_PING:
self.reply = self.init_packet.payload
elif self.init_packet.cmd == hidtransport.UsbHidTransport.U2FHID_MSG:
self.reply = self.msg_reply
else:
raise UnsupportedCommandError()
UnsupportedCommandError: if the requested amount is not 64.
"""
if not self.transaction_active or not self.full_packet_received:
return None
ret = None
if self.busy_count > 0:
ret = hidtransport.UsbHidTransport.InitPacket(
64, self.init_packet.cid, hidtransport.UsbHidTransport.U2FHID_ERROR,
1, hidtransport.UsbHidTransport.ERR_CHANNEL_BUSY)
self.busy_count -= 1
elif self.reply: # reply in progress
next_frame = self.reply[0:59]
self.reply = self.reply[59:]
ret = hidtransport.UsbHidTransport.ContPacket(64, self.init_packet.cid,
self.seq, next_frame)
self.seq += 1
else:
self.InternalGenerateReply()
first_frame = self.reply[0:57]
ret = hidtransport.UsbHidTransport.InitPacket(
64, self.init_packet.cid, self.init_packet.cmd, len(self.reply),
first_frame)
self.reply = self.reply[57:]
if not self.reply: # done after this packet
self.reply = None
self.transaction_active = False
self.seq = 0
def InternalInit(self):
"""Initializes the device and obtains channel id."""
self.cid = UsbHidTransport.U2FHID_BROADCAST_CID
nonce = bytearray(os.urandom(8))
r = self.InternalExchange(UsbHidTransport.U2FHID_INIT, nonce)
if len(r) < 17:
raise OSError('unexpected init reply len')
if r[0:8] != nonce:
raise OSError('nonce mismatch')
self.cid = bytearray(r[8:12])
self.u2fhid_version = r[12]
self.device_version = tuple(r[13:16])
self.capabilities = r[16]
def InternalRecv(self):
"""Receives a message from the device, including defragmenting it."""
first_packet_read = False
while not first_packet_read:
first_read = self.InternalReadFrame()
first_packet = UsbHidTransport.InitPacket.FromWireFormat(
self.packet_size, first_read)
first_packet_read = self.cid == first_packet.cid
data = first_packet.payload
to_read = first_packet.size - len(first_packet.payload)
seq = 0
while to_read > 0:
next_read = self.InternalReadFrame()
next_packet = UsbHidTransport.ContPacket.FromWireFormat(self.packet_size,
next_read)
if self.cid != next_packet.cid:
# Skip over packets that are for communication with other clients.
# HID is broadcast, so we see potentially all communication from the
# device. For well-behaved devices, these should be BUSY messages
# sent to other clients of the device because at this point we're
# in mid-message transit.
continue
if seq != next_packet.seq:
raise OSError('Packets received out of order')
# This packet for us at this point, so debit it against our
# balance of bytes to read.
to_read -= len(next_packet.payload)
def InternalExchange(self, cmd, payload_in):
"""Sends and receives a message from the device."""
# make a copy because we destroy it below
self.logger.debug('payload: ' + str(list(payload_in)))
payload = bytearray()
payload[:] = payload_in
for _ in range(2):
self.InternalSend(cmd, payload)
ret_cmd, ret_payload = self.InternalRecv()
if ret_cmd == UsbHidTransport.U2FHID_ERROR:
if ret_payload == UsbHidTransport.ERR_CHANNEL_BUSY:
time.sleep(0.5)
continue
raise OSError('Device error: %d' % int(ret_payload[0]))
elif ret_cmd != cmd:
raise OSError('Command mismatch!')
return ret_payload
raise OSError('Device Busy. Please retry')
def DiscoverLocalHIDU2FDevices(selector=HidUsageSelector):
for d in hid.Enumerate():
if selector(d):
try:
dev = hid.Open(d['path'])
yield UsbHidTransport(dev)
except OSError:
# Insufficient permissions to access device
pass
def list_devices(cls, selector=hidtransport.HidUsageSelector):
for d in hidtransport.hid.Enumerate():
if selector(d):
try:
dev = hidtransport.hid.Open(d["path"])
yield cls(d, hidtransport.UsbHidTransport(dev))
except OSError:
# Insufficient permissions to access device
pass
def SendWink(self):
return self.InternalExchange(UsbHidTransport.U2FHID_WINK, bytearray([]))
def InternalRecv(self):
"""Receives a message from the device, including defragmenting it."""
first_packet_read = False
while not first_packet_read:
first_read = self.InternalReadFrame()
first_packet = UsbHidTransport.InitPacket.FromWireFormat(
self.packet_size, first_read)
first_packet_read = self.cid == first_packet.cid
data = first_packet.payload
to_read = first_packet.size - len(first_packet.payload)
seq = 0
while to_read > 0:
next_read = self.InternalReadFrame()
next_packet = UsbHidTransport.ContPacket.FromWireFormat(self.packet_size,
next_read)
if self.cid != next_packet.cid:
# Skip over packets that are for communication with other clients.
# HID is broadcast, so we see potentially all communication from the
# device. For well-behaved devices, these should be BUSY messages
# sent to other clients of the device because at this point we're