Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dsa_public_key_attr(self):
private = asymmetric.load_private_key(os.path.join(fixtures_dir, 'keys/test-dsa-1024.key'))
public = asymmetric.load_public_key(os.path.join(fixtures_dir, 'keys/test-dsa-1024.crt'))
computed_public = private.public_key
self.assertEqual(public.asn1.dump(), computed_public.asn1.dump())
def test_rsa(self):
# A key we generated earlier
self.session.generate_keypair(KeyType.RSA, 1024)
pub = self.session.get_key(key_type=KeyType.RSA,
object_class=ObjectClass.PUBLIC_KEY)
pub = encode_rsa_public_key(pub)
from oscrypto.asymmetric import load_public_key, rsa_pkcs1v15_encrypt
pub = load_public_key(pub)
crypttext = rsa_pkcs1v15_encrypt(pub, b'Data to encrypt')
priv = self.session.get_key(key_type=KeyType.RSA,
object_class=ObjectClass.PRIVATE_KEY)
plaintext = priv.decrypt(crypttext, mechanism=Mechanism.RSA_PKCS)
self.assertEqual(plaintext, b'Data to encrypt')
def test_rsa_public_key_unwrap(self):
public = asymmetric.load_public_key(os.path.join(fixtures_dir, 'keys/test.crt'))
self.assertIsInstance(public.unwrap(), keys.RSAPublicKey)
def test_rsa_pss_verify_fail(self):
with open(os.path.join(fixtures_dir, 'message.txt'), 'rb') as f:
original_data = f.read()
with open(os.path.join(fixtures_dir, 'rsa_pss_signature'), 'rb') as f:
signature = f.read()
public = asymmetric.load_public_key(os.path.join(fixtures_dir, 'keys/test.crt'))
with self.assertRaises(errors.SignatureError):
asymmetric.rsa_pss_verify(public, signature, original_data + b'1', 'sha1')
def test_dsa_verify_fail(self):
with open(os.path.join(fixtures_dir, 'message.txt'), 'rb') as f:
original_data = f.read()
with open(os.path.join(fixtures_dir, 'dsa_signature'), 'rb') as f:
signature = f.read()
public = asymmetric.load_public_key(os.path.join(fixtures_dir, 'keys/test-dsa-1024.crt'))
with self.assertRaises(errors.SignatureError):
asymmetric.dsa_verify(public, signature, original_data + b'1', 'sha1')
def test_rsa_sign(self):
original_data = b'This is data to sign'
private = asymmetric.load_private_key(os.path.join(fixtures_dir, 'keys/test.key'))
public = asymmetric.load_public_key(os.path.join(fixtures_dir, 'keys/test.crt'))
signature = asymmetric.rsa_pkcs1v15_sign(private, original_data, 'sha1')
self.assertIsInstance(signature, byte_cls)
asymmetric.rsa_pkcs1v15_verify(public, signature, original_data, 'sha1')
def _sign(csr, buf, profile, skip_notify=False, skip_push=False, overwrite=False, signer=None):
# TODO: CRLDistributionPoints, OCSP URL, Certificate URL
assert buf.startswith(b"-----BEGIN ")
assert isinstance(csr, CertificationRequest)
csr_pubkey = asymmetric.load_public_key(csr["certification_request_info"]["subject_pk_info"])
common_name = csr["certification_request_info"]["subject"].native["common_name"]
cert_path = os.path.join(config.SIGNED_DIR, "%s.pem" % common_name)
renew = False
attachments = [
(buf, "application/x-pem-file", common_name + ".csr"),
]
revoked_path = None
overwritten = False
# Move existing certificate if necessary
if os.path.exists(cert_path):
with open(cert_path, "rb") as fh:
prev_buf = fh.read()
header, _, der_bytes = pem.unarmor(prev_buf)
elif signature_algo == 'dsa':
verify_func = asymmetric.dsa_verify
elif signature_algo == 'ecdsa':
verify_func = asymmetric.ecdsa_verify
else:
raise PathValidationError(pretty_message(
'''
The path could not be validated because the signature of %s
uses the unsupported algorithm %s
''',
_cert_type(index, last_index, end_entity_name_override, definite=True),
signature_algo
))
try:
key_object = asymmetric.load_public_key(working_public_key)
verify_func(key_object, cert['signature_value'].native, cert['tbs_certificate'].dump(), hash_algo)
except (oscrypto.errors.SignatureError):
raise PathValidationError(pretty_message(
'''
The path could not be validated because the signature of %s
could not be verified
''',
_cert_type(index, last_index, end_entity_name_override, definite=True)
))
# Step 2 a 2
if not validation_context.is_whitelisted(cert):
validity = cert['tbs_certificate']['validity']
if moment < validity['not_before'].native:
raise PathValidationError(pretty_message(
(buf, "application/x-pem-file", common_name + ".csr"),
]
revoked_path = None
overwritten = False
# Move existing certificate if necessary
if os.path.exists(cert_path):
with open(cert_path, "rb") as fh:
prev_buf = fh.read()
header, _, der_bytes = pem.unarmor(prev_buf)
prev = x509.Certificate.load(der_bytes)
# TODO: assert validity here again?
renew = \
asymmetric.load_public_key(prev["tbs_certificate"]["subject_public_key_info"]) == \
csr_pubkey
# BUGBUG: is this enough?
if overwrite:
# TODO: is this the best approach?
# TODO: why didn't unittest detect bugs here?
prev_serial_hex = "%x" % prev.serial_number
revoked_path = os.path.join(config.REVOKED_DIR, "%040x.pem" % prev.serial_number)
os.rename(cert_path, revoked_path)
attachments += [(prev_buf, "application/x-pem-file", "deprecated.crt" if renew else "overwritten.crt")]
overwritten = True
else:
raise FileExistsError("Will not overwrite existing certificate")
builder = CertificateBuilder(cn_to_dn(common_name, const.FQDN,
o=certificate["tbs_certificate"]["subject"].native.get("organization_name"),
elif signature_algo == 'dsa':
verify_func = asymmetric.dsa_verify
elif signature_algo == 'ecdsa':
verify_func = asymmetric.ecdsa_verify
else:
raise PathValidationError(pretty_message(
'''
The path could not be validated because the signature of %s
uses the unsupported algorithm %s
''',
_cert_type(index, last_index, end_entity_name_override, definite=True),
signature_algo
))
try:
key_object = asymmetric.load_public_key(working_public_key)
verify_func(key_object, cert['signature_value'].native, cert['tbs_certificate'].dump(), hash_algo)
except (oscrypto.errors.SignatureError):
raise PathValidationError(pretty_message(
'''
The path could not be validated because the signature of %s
could not be verified
''',
_cert_type(index, last_index, end_entity_name_override, definite=True)
))
# Step 2 a 2
if not validation_context.is_whitelisted(cert):
validity = cert['tbs_certificate']['validity']
if moment < validity['not_before'].native:
raise PathValidationError(pretty_message(