Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
line = data[start:end] if end else data[start:]
line = line.rstrip()
if b':' in line:
hdr, value = line.split(b':', 1)
headers[hdr.strip()] = value.strip()
else:
break
start = end if end != 0 else len(data)
try:
return headers, binascii.a2b_base64(data[start:])
except binascii.Error:
raise KeyImportError('Invalid PEM data') from None
def _parse_openssh(data):
"""Parse an OpenSSH format public key or certificate"""
line = data.split(None, 2)
if len(line) < 2:
raise KeyImportError('Invalid OpenSSH public key or certificate')
elif len(line) == 2:
comment = None
else:
comment = line[2]
if (line[0] not in _public_key_alg_map and
line[0] not in _certificate_alg_map):
raise KeyImportError('Unknown OpenSSH public key algorithm')
try:
return line[0], comment, binascii.a2b_base64(line[1])
except binascii.Error:
raise KeyImportError('Invalid OpenSSH public key '
'or certificate') from None
def _parse_openssh(data):
"""Parse an OpenSSH format public key or certificate"""
line = data.split(None, 2)
if len(line) < 2:
raise KeyImportError('Invalid OpenSSH public key or certificate')
elif len(line) == 2:
comment = None
else:
comment = line[2]
if (line[0] not in _public_key_alg_map and
line[0] not in _certificate_alg_map):
raise KeyImportError('Unknown OpenSSH public key algorithm')
try:
return line[0], comment, binascii.a2b_base64(line[1])
except binascii.Error:
raise KeyImportError('Invalid OpenSSH public key '
'or certificate') from None
else:
alg, alg_params = key_data[0][0], OMIT
handler = _pkcs8_oid_map.get(alg)
if handler is None:
raise KeyImportError('Unknown PKCS#8 algorithm')
key_params = handler.decode_pkcs8_public(alg_params, key_data[1].value)
if key_params is None:
raise KeyImportError('Invalid %s public key' %
handler.pem_name.decode('ascii')
if handler.pem_name else 'PKCS#8')
return handler.make_public(*key_params)
else:
raise KeyImportError('Invalid PKCS#8 public key')
if not 0 < d < n:
raise KeyImportError('Invalid private key')
if public_key:
try:
Q = _PrimePoint.decode(G.curve, public_key)
if not Q or n * Q:
raise KeyImportError('Invalid public key')
except ValueError:
raise KeyImportError('Invalid public key') from None
if d:
if Q:
if d * G != Q:
raise KeyImportError('Public and private key don\'t match')
else:
Q = d * G
elif not Q:
raise KeyImportError('No keys specified')
self.algorithm = algorithm
self._alg_id = alg_id
self._alg_oid = alg_oid
self._hash_alg = hash_alg
self._G = G
self._n = n
self._d = d
self._Q = Q
packet = SSHPacket(data)
alg = packet.get_string()
handler = _public_key_alg_map.get(alg)
if handler:
key_params = handler.decode_ssh_public(packet)
packet.check_end()
key = handler.make_public(*key_params)
key.algorithm = alg
return key
else:
raise KeyImportError('Unknown key algorithm: %s' %
alg.decode('ascii', errors='replace'))
except PacketDecodeError:
raise KeyImportError('Invalid public key') from None
def _decode_options(options, decoders, critical=True):
"""Decode options found in this certificate"""
packet = SSHPacket(options)
result = {}
while packet:
name = packet.get_string()
decoder = decoders.get(name)
if decoder:
data_packet = SSHPacket(packet.get_string())
result[name.decode('ascii')] = decoder(data_packet)
data_packet.check_end()
elif critical:
raise KeyImportError('Unrecognized critical option: %s' %
name.decode('ascii', errors='replace'))
return result
def _decode_openssh_private(data, passphrase):
"""Decode an OpenSSH format private key"""
try:
if not data.startswith(_OPENSSH_KEY_V1):
raise KeyImportError('Unrecognized OpenSSH private key type')
data = data[len(_OPENSSH_KEY_V1):]
packet = SSHPacket(data)
cipher_name = packet.get_string()
kdf = packet.get_string()
kdf_data = packet.get_string()
nkeys = packet.get_uint32()
_ = packet.get_string() # public_key
key_data = packet.get_string()
mac = packet.get_remaining_payload()
if nkeys != 1:
raise KeyImportError('Invalid OpenSSH private key')
if cipher_name != b'none':
def _decode_pkcs8_private(key_data):
"""Decode a PKCS#8 format private key"""
if (isinstance(key_data, tuple) and len(key_data) >= 3 and
key_data[0] in (0, 1) and isinstance(key_data[1], tuple) and
1 <= len(key_data[1]) <= 2 and isinstance(key_data[2], bytes)):
if len(key_data[1]) == 2:
alg, alg_params = key_data[1]
else:
alg, alg_params = key_data[1][0], OMIT
handler = _pkcs8_oid_map.get(alg)
if handler is None:
raise KeyImportError('Unknown PKCS#8 algorithm')
key_params = handler.decode_pkcs8_private(alg_params, key_data[2])
if key_params is None:
raise KeyImportError('Invalid %s private key' %
handler.pem_name.decode('ascii')
if handler.pem_name else 'PKCS#8')
return handler.make_private(*key_params)
else:
raise KeyImportError('Invalid PKCS#8 private key')