Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _load_cert_object(self, *path_components):
with open(os.path.join(fixtures_dir, *path_components), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
cert = x509.Certificate.load(cert_bytes)
return cert
def _load_cert(self, relative_path):
with open(os.path.join(fixtures_dir, relative_path), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
return x509.Certificate.load(cert_bytes)
def _read_full_chain(self):
with open(self.filename, 'rb') as f_obj:
pem_obj = asn1crypto.pem.unarmor(f_obj.read(), multiple=True)
try:
for type_name, _, der_bytes in pem_obj:
if type_name == 'CERTIFICATE':
crt = asn1crypto.x509.Certificate.load(der_bytes)
if crt.ca:
LOG.info("Found part of the chain..")
self.intermediates.append(crt)
else:
LOG.info("Found the end entity..")
self.end_entity = crt
self.ocsp_urls = crt.ocsp_urls
except binascii.Error:
LOG.error(
"Certificate file contains errors \"%s\".",
self.filename
)
if self.end_entity is None:
LOG.error(
"Can't find server certificate items for \"%s\".",
self.filename
for cert in certs:
if isinstance(cert, x509.Certificate):
output.append(cert)
else:
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
%s must contain only byte strings or
asn1crypto.x509.Certificate objects, not %s
''',
var_name,
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
output.append(x509.Certificate.load(cert))
return output
# In the main loop, only complete CRLs are processed, so delta CRLs are
# weeded out of the todo list
crls_to_process = []
for issuer_crls in complete_lists_by_issuer.values():
crls_to_process.extend(issuer_crls)
total_crls = len(crls_to_process)
# Build a lookup table for the Distribution point objects associated with
# an issuer name hashable
distribution_point_map = {}
sources = [cert.crl_distribution_points]
if use_deltas:
sources.extend(cert.delta_crl_distribution_points)
for dp_list in sources:
for distribution_point in dp_list:
if isinstance(distribution_point['crl_issuer'], x509.GeneralNames):
dp_name_hashes = []
for general_name in distribution_point['crl_issuer']:
if general_name.name == 'directory_name':
dp_name_hashes.append(general_name.chosen.hashable)
else:
dp_name_hashes = [cert.issuer.hashable]
for dp_name_hash in dp_name_hashes:
if dp_name_hash not in distribution_point_map:
distribution_point_map[dp_name_hash] = []
distribution_point_map[dp_name_hash].append(distribution_point)
valid_reasons = set([
'key_compromise',
'ca_compromise',
'affiliation_changed',
'superseded',
A list of unicode strings to use as the values for the new
x509.GeneralName objects
"""
if self._subject_alt_name is not None:
filtered_general_names = []
for general_name in self._subject_alt_name:
if general_name.name != name:
filtered_general_names.append(general_name)
self._subject_alt_name = x509.GeneralNames(filtered_general_names)
else:
self._subject_alt_name = x509.GeneralNames()
if values is not None:
for value in values:
new_general_name = x509.GeneralName(name=name, value=value)
self._subject_alt_name.append(new_general_name)
if len(self._subject_alt_name) == 0:
self._subject_alt_name = None
def get_tbs_certificate_from_leaf_bytes(leaf: bytes) -> asn1crypto.x509.TbsCertificate:
leaf_cert = ctl_parser_structures.MerkleTreeLeaf.parse(leaf).TimestampedEntry
if leaf_cert.LogEntryType == "X509LogEntryType":
# We have a normal x509 entry
cert = asn1crypto.x509.Certificate.load(leaf_cert.Entry.CertData)
tbsCert = cert["tbs_certificate"]
else:
# We have a precert entry
tbsCert = asn1crypto.x509.TbsCertificate.load(leaf_cert.Entry.TBSCertificate)
return tbsCert
def get_pretty_dn(name, rdn_separator=None, type_value_separator=None, include_oid=None, include_string_type=None):
if not isinstance(name, x509.Name):
raise TypeError("name must be an x509.Name")
s = ""
if rdn_separator is None:
rdn_separator = ", "
if type_value_separator is None:
type_value_separator = "="
if include_oid is None:
include_oid = False
if include_string_type is None:
include_string_type = False
string_type = ''
def _csr_info(self, subject, public_key, sans):
"""
Create the csr info portion of the certificate request"s ASN.1
structure
:param X509Name subject: subject to add to the certificate request
:param asymmetric.PublicKey public_key: public key to use when creating
the certificate request"s signature
:param sans: collection of dns names to insert into a subjAltName
extension for the certificate request
:type sans: None or list(str) or tuple(str) or set(str)
:return: the certificate request info structure
:rtype: csr.CertificationRequestInfo
"""
x509_subject = x509.Name.build(self._subject_as_dict(subject))
extensions = [(u"basic_constraints",
x509.BasicConstraints({"ca": False}),
False),
(u"key_usage",
x509.KeyUsage({"digital_signature",
"key_encipherment"}),
True),
(u"extended_key_usage",
x509.ExtKeyUsageSyntax([u"client_auth"]),
False)]
if sans:
names = x509.GeneralNames()
for san in sans:
names.append(x509.GeneralName("dns_name",
_bytes_to_unicode(san)))
extensions.append((u"subject_alt_name", names, False))
def certsign(self, sn, pubKey, subject, until, caprivKey):
tbs = asn1x509.TbsCertificate({
'version': 'v1',
'serial_number': sn,
'issuer': asn1x509.Name.build({
'common_name': 'hsm CA',
}),
'subject': asn1x509.Name.build({
'common_name': subject,
}),
'signature': {
'algorithm': 'sha256_rsa',
'parameters': None,
},
'validity': {
'not_before': asn1x509.Time({
'utc_time': datetime.datetime.now(tz=asn1util.timezone.utc) - datetime.timedelta(days=1),
}),