Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from asyncssh.asn1 import TaggedDERObject
from asyncssh.crypto import chacha_available, ed25519_available, ed448_available
from asyncssh.packet import MPInt, String, UInt32
from asyncssh.pbe import pkcs1_decrypt
from asyncssh.public_key import CERT_TYPE_USER, CERT_TYPE_HOST, SSHKey
from asyncssh.public_key import SSHX509CertificateChain
from asyncssh.public_key import decode_ssh_certificate
from asyncssh.public_key import get_public_key_algs, get_certificate_algs
from asyncssh.public_key import get_x509_certificate_algs
from asyncssh.public_key import import_certificate_subject
from .util import bcrypt_available, x509_available
from .util import make_certificate, run, TempDirTestCase
_ES1_SHA1_DES = ObjectIdentifier('1.2.840.113549.1.5.10')
_P12_RC4_40 = ObjectIdentifier('1.2.840.113549.1.12.1.2')
_ES2 = ObjectIdentifier('1.2.840.113549.1.5.13')
_ES2_PBKDF2 = ObjectIdentifier('1.2.840.113549.1.5.12')
_ES2_AES128 = ObjectIdentifier('2.16.840.1.101.3.4.1.2')
_ES2_DES3 = ObjectIdentifier('1.2.840.113549.3.7')
try:
_openssl_version = run('openssl version')
except subprocess.CalledProcessError: # pragma: no cover
_openssl_version = b''
_openssl_available = _openssl_version != b''
# The openssl "-v2prf" option is only available in OpenSSL 1.0.2 or later
_openssl_supports_v2prf = _openssl_version >= b'OpenSSL 1.0.2'
(BitString(b'\x80'), '03020080'),
(BitString(b'\x80\x00', 7), '0303078000'),
(BitString(''), '030100'),
(BitString('0'), '03020700'),
(BitString('1'), '03020780'),
(BitString('10'), '03020680'),
(BitString('10000000'), '03020080'),
(BitString('10000001'), '03020081'),
(BitString('100000000'), '0303078000'),
(IA5String(b''), '1600'),
(IA5String(b'\0'), '160100'),
(IA5String(b'abc'), '1603616263'),
(ObjectIdentifier('0.0'), '060100'),
(ObjectIdentifier('1.2'), '06012a'),
(ObjectIdentifier('1.2.840'), '06032a8648'),
(ObjectIdentifier('2.5'), '060155'),
(ObjectIdentifier('2.40'), '060178'),
(TaggedDERObject(0, None), 'a0020500'),
(TaggedDERObject(1, None), 'a1020500'),
(TaggedDERObject(32, None), 'bf20020500'),
(TaggedDERObject(128, None), 'bf8100020500'),
(TaggedDERObject(0, None, PRIVATE), 'e0020500'),
(RawDERObject(0, b'', PRIVATE), 'c000')
]
encode_errors = [
(range, [1]), # Unsupported type
from asyncssh.packet import MPInt, String, UInt32
from asyncssh.pbe import pkcs1_decrypt
from asyncssh.public_key import CERT_TYPE_USER, CERT_TYPE_HOST, SSHKey
from asyncssh.public_key import SSHX509CertificateChain
from asyncssh.public_key import decode_ssh_certificate
from asyncssh.public_key import get_public_key_algs, get_certificate_algs
from asyncssh.public_key import get_x509_certificate_algs
from asyncssh.public_key import import_certificate_subject
from .util import bcrypt_available, x509_available
from .util import make_certificate, run, TempDirTestCase
_ES1_SHA1_DES = ObjectIdentifier('1.2.840.113549.1.5.10')
_P12_RC4_40 = ObjectIdentifier('1.2.840.113549.1.12.1.2')
_ES2 = ObjectIdentifier('1.2.840.113549.1.5.13')
_ES2_PBKDF2 = ObjectIdentifier('1.2.840.113549.1.5.12')
_ES2_AES128 = ObjectIdentifier('2.16.840.1.101.3.4.1.2')
_ES2_DES3 = ObjectIdentifier('1.2.840.113549.3.7')
try:
_openssl_version = run('openssl version')
except subprocess.CalledProcessError: # pragma: no cover
_openssl_version = b''
_openssl_available = _openssl_version != b''
# The openssl "-v2prf" option is only available in OpenSSL 1.0.2 or later
_openssl_supports_v2prf = _openssl_version >= b'OpenSSL 1.0.2'
try:
if sys.platform != 'win32':
from hashlib import md5, sha1
from .asn1 import ASN1DecodeError, ObjectIdentifier, der_encode, der_decode
from .crypto import BasicCipher, get_cipher_params, pbkdf2_hmac
# pylint: disable=bad-whitespace
_ES1_MD5_DES = ObjectIdentifier('1.2.840.113549.1.5.3')
_ES1_SHA1_DES = ObjectIdentifier('1.2.840.113549.1.5.10')
_ES2 = ObjectIdentifier('1.2.840.113549.1.5.13')
_P12_RC4_128 = ObjectIdentifier('1.2.840.113549.1.12.1.1')
_P12_RC4_40 = ObjectIdentifier('1.2.840.113549.1.12.1.2')
_P12_DES3 = ObjectIdentifier('1.2.840.113549.1.12.1.3')
_P12_DES2 = ObjectIdentifier('1.2.840.113549.1.12.1.4')
_ES2_CAST128 = ObjectIdentifier('1.2.840.113533.7.66.10')
_ES2_DES3 = ObjectIdentifier('1.2.840.113549.3.7')
_ES2_BF = ObjectIdentifier('1.3.6.1.4.1.3029.1.2')
_ES2_DES = ObjectIdentifier('1.3.14.3.2.7')
_ES2_AES128 = ObjectIdentifier('2.16.840.1.101.3.4.1.2')
_ES2_AES192 = ObjectIdentifier('2.16.840.1.101.3.4.1.22')
_ES2_AES256 = ObjectIdentifier('2.16.840.1.101.3.4.1.42')
_ES2_PBKDF2 = ObjectIdentifier('1.2.840.113549.1.5.12')
_ES2_SHA1 = ObjectIdentifier('1.2.840.113549.2.7')
_ES2_SHA224 = ObjectIdentifier('1.2.840.113549.2.8')
_ES2_SHA256 = ObjectIdentifier('1.2.840.113549.2.9')
# pylint: disable=bad-whitespace
_ES1_MD5_DES = ObjectIdentifier('1.2.840.113549.1.5.3')
_ES1_SHA1_DES = ObjectIdentifier('1.2.840.113549.1.5.10')
_ES2 = ObjectIdentifier('1.2.840.113549.1.5.13')
_P12_RC4_128 = ObjectIdentifier('1.2.840.113549.1.12.1.1')
_P12_RC4_40 = ObjectIdentifier('1.2.840.113549.1.12.1.2')
_P12_DES3 = ObjectIdentifier('1.2.840.113549.1.12.1.3')
_P12_DES2 = ObjectIdentifier('1.2.840.113549.1.12.1.4')
_ES2_CAST128 = ObjectIdentifier('1.2.840.113533.7.66.10')
_ES2_DES3 = ObjectIdentifier('1.2.840.113549.3.7')
_ES2_BF = ObjectIdentifier('1.3.6.1.4.1.3029.1.2')
_ES2_DES = ObjectIdentifier('1.3.14.3.2.7')
_ES2_AES128 = ObjectIdentifier('2.16.840.1.101.3.4.1.2')
_ES2_AES192 = ObjectIdentifier('2.16.840.1.101.3.4.1.22')
_ES2_AES256 = ObjectIdentifier('2.16.840.1.101.3.4.1.42')
_ES2_PBKDF2 = ObjectIdentifier('1.2.840.113549.1.5.12')
_ES2_SHA1 = ObjectIdentifier('1.2.840.113549.2.7')
_ES2_SHA224 = ObjectIdentifier('1.2.840.113549.2.8')
_ES2_SHA256 = ObjectIdentifier('1.2.840.113549.2.9')
_ES2_SHA384 = ObjectIdentifier('1.2.840.113549.2.10')
_ES2_SHA512 = ObjectIdentifier('1.2.840.113549.2.11')
# pylint: enable=bad-whitespace
_P12_RC4_40 = ObjectIdentifier('1.2.840.113549.1.12.1.2')
_P12_DES3 = ObjectIdentifier('1.2.840.113549.1.12.1.3')
_P12_DES2 = ObjectIdentifier('1.2.840.113549.1.12.1.4')
_ES2_CAST128 = ObjectIdentifier('1.2.840.113533.7.66.10')
_ES2_DES3 = ObjectIdentifier('1.2.840.113549.3.7')
_ES2_BF = ObjectIdentifier('1.3.6.1.4.1.3029.1.2')
_ES2_DES = ObjectIdentifier('1.3.14.3.2.7')
_ES2_AES128 = ObjectIdentifier('2.16.840.1.101.3.4.1.2')
_ES2_AES192 = ObjectIdentifier('2.16.840.1.101.3.4.1.22')
_ES2_AES256 = ObjectIdentifier('2.16.840.1.101.3.4.1.42')
_ES2_PBKDF2 = ObjectIdentifier('1.2.840.113549.1.5.12')
_ES2_SHA1 = ObjectIdentifier('1.2.840.113549.2.7')
_ES2_SHA224 = ObjectIdentifier('1.2.840.113549.2.8')
_ES2_SHA256 = ObjectIdentifier('1.2.840.113549.2.9')
_ES2_SHA384 = ObjectIdentifier('1.2.840.113549.2.10')
_ES2_SHA512 = ObjectIdentifier('1.2.840.113549.2.11')
# pylint: enable=bad-whitespace
_pkcs1_cipher = {}
_pkcs1_dek_name = {}
_pkcs8_handler = {}
_pkcs8_cipher_oid = {}
_pbes2_cipher = {}
_pbes2_cipher_oid = {}
_pbes2_kdf = {}
# Ron Frederick - initial implementation, API, and documentation
"""Asymmetric key password based encryption functions"""
import os
from hashlib import md5, sha1
from .asn1 import ASN1DecodeError, ObjectIdentifier, der_encode, der_decode
from .crypto import BasicCipher, get_cipher_params, pbkdf2_hmac
# pylint: disable=bad-whitespace
_ES1_MD5_DES = ObjectIdentifier('1.2.840.113549.1.5.3')
_ES1_SHA1_DES = ObjectIdentifier('1.2.840.113549.1.5.10')
_ES2 = ObjectIdentifier('1.2.840.113549.1.5.13')
_P12_RC4_128 = ObjectIdentifier('1.2.840.113549.1.12.1.1')
_P12_RC4_40 = ObjectIdentifier('1.2.840.113549.1.12.1.2')
_P12_DES3 = ObjectIdentifier('1.2.840.113549.1.12.1.3')
_P12_DES2 = ObjectIdentifier('1.2.840.113549.1.12.1.4')
_ES2_CAST128 = ObjectIdentifier('1.2.840.113533.7.66.10')
_ES2_DES3 = ObjectIdentifier('1.2.840.113549.3.7')
_ES2_BF = ObjectIdentifier('1.3.6.1.4.1.3029.1.2')
_ES2_DES = ObjectIdentifier('1.3.14.3.2.7')
_ES2_AES128 = ObjectIdentifier('2.16.840.1.101.3.4.1.2')
_ES2_AES192 = ObjectIdentifier('2.16.840.1.101.3.4.1.22')
_ES2_AES256 = ObjectIdentifier('2.16.840.1.101.3.4.1.42')
from .public_key import KeyImportError, KeyExportError
from .public_key import register_public_key_alg, register_certificate_alg
from .public_key import register_x509_certificate_alg
# OID for EC prime fields
PRIME_FIELD = ObjectIdentifier('1.2.840.10045.1.1')
_alg_oids = {}
_alg_oid_map = {}
class _ECKey(SSHKey):
"""Handler for elliptic curve public key encryption"""
pem_name = b'EC'
pkcs8_oid = ObjectIdentifier('1.2.840.10045.2.1')
def __init__(self, key):
super().__init__(key)
self.algorithm = b'ecdsa-sha2-' + key.curve_id
self.sig_algorithms = (self.algorithm,)
self.x509_algorithms = (b'x509v3-' + self.algorithm,)
self.all_sig_algorithms = set(self.sig_algorithms)
self._alg_oid = _alg_oids[key.curve_id]
def __eq__(self, other):
# This isn't protected access - both objects are _ECKey instances
# pylint: disable=protected-access
return (isinstance(other, type(self)) and
from .asn1 import ASN1DecodeError, ObjectIdentifier, der_encode, der_decode
from .crypto import RSAPrivateKey, RSAPublicKey
from .misc import all_ints
from .packet import MPInt
from .public_key import SSHKey, SSHOpenSSHCertificateV01, KeyExportError
from .public_key import register_public_key_alg, register_certificate_alg
from .public_key import register_x509_certificate_alg
class _RSAKey(SSHKey):
"""Handler for RSA public key encryption"""
algorithm = b'ssh-rsa'
pem_name = b'RSA'
pkcs8_oid = ObjectIdentifier('1.2.840.113549.1.1.1')
sig_algorithms = (b'rsa-sha2-256', b'rsa-sha2-512', b'ssh-rsa')
x509_sig_algorithms = (b'rsa2048-sha256', b'ssh-rsa')
x509_algorithms = tuple(b'x509v3-' + alg for alg in x509_sig_algorithms)
all_sig_algorithms = set(x509_sig_algorithms + sig_algorithms)
def __eq__(self, other):
# This isn't protected access - both objects are _RSAKey instances
# pylint: disable=protected-access
return (isinstance(other, type(self)) and
self._key.n == other._key.n and
self._key.e == other._key.e and
self._key.d == other._key.d)
def __hash__(self):
return hash((self._key.n, self._key.e, self._key.d,