Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
error_pointer
)
handle_cf_error(error_pointer)
cf_data = CFHelpers.cf_data_from_bytes(data)
Security.SecTransformSetAttribute(
sec_transform,
Security.kSecTransformInputAttributeName,
cf_data,
error_pointer
)
handle_cf_error(error_pointer)
res = Security.SecTransformExecute(sec_transform, error_pointer)
if not is_null(error_pointer):
error = unwrap(error_pointer)
if not is_null(error):
raise SignatureError('Signature is invalid')
res = bool(CoreFoundation.CFBooleanGetValue(res))
if not res:
raise SignatureError('Signature is invalid')
finally:
if sec_transform:
CoreFoundation.CFRelease(sec_transform)
if cf_signature:
CoreFoundation.CFRelease(cf_signature)
if cf_data:
CoreFoundation.CFRelease(cf_data)
if cf_hash_length:
else:
parsed_key_info = key_info['private_key'].parsed
pubkey.magic = Advapi32Const.RSA2
pubkey.pubexp = parsed_key_info['public_exponent'].native
blob_data = int_to_bytes(parsed_key_info['modulus'].native, signed=False, width=len1)[::-1]
blob_data += int_to_bytes(parsed_key_info['prime1'].native, signed=False, width=len2)[::-1]
blob_data += int_to_bytes(parsed_key_info['prime2'].native, signed=False, width=len2)[::-1]
blob_data += int_to_bytes(parsed_key_info['exponent1'].native, signed=False, width=len2)[::-1]
blob_data += int_to_bytes(parsed_key_info['exponent2'].native, signed=False, width=len2)[::-1]
blob_data += int_to_bytes(parsed_key_info['coefficient'].native, signed=False, width=len2)[::-1]
blob_data += int_to_bytes(parsed_key_info['private_exponent'].native, signed=False, width=len1)[::-1]
blob_struct.rsapubkey = pubkey
else:
pubkey_pointer = struct(advapi32, 'DSSPUBKEY')
pubkey = unwrap(pubkey_pointer)
pubkey.bitlen = bit_size
if key_type == 'public':
pubkey.magic = Advapi32Const.DSS1
params = key_info['algorithm']['parameters'].native
key_data = int_to_bytes(key_info['public_key'].parsed.native, signed=False, width=len1)[::-1]
else:
pubkey.magic = Advapi32Const.DSS2
params = key_info['private_key_algorithm']['parameters'].native
key_data = int_to_bytes(key_info['private_key'].parsed.native, signed=False, width=20)[::-1]
blob_struct.dsspubkey = pubkey
blob_data = int_to_bytes(params['p'], signed=False, width=len1)[::-1]
blob_data += int_to_bytes(params['q'], signed=False, width=20)[::-1]
blob_data += int_to_bytes(params['g'], signed=False, width=len1)[::-1]
blob_data += key_data
'sha256': 32,
'sha384': 48,
'sha512': 64
}[hash_algorithm]
flags = BcryptConst.BCRYPT_PAD_PSS
padding_info_struct_pointer = struct(bcrypt, 'BCRYPT_PSS_PADDING_INFO')
padding_info_struct = unwrap(padding_info_struct_pointer)
# This has to be assigned to a variable to prevent cffi from gc'ing it
hash_buffer = buffer_from_unicode(hash_constant)
padding_info_struct.pszAlgId = cast(bcrypt, 'wchar_t *', hash_buffer)
padding_info_struct.cbSalt = hash_length
else:
flags = BcryptConst.BCRYPT_PAD_PKCS1
padding_info_struct_pointer = struct(bcrypt, 'BCRYPT_PKCS1_PADDING_INFO')
padding_info_struct = unwrap(padding_info_struct_pointer)
# This has to be assigned to a variable to prevent cffi from gc'ing it
if hash_algorithm == 'raw':
padding_info_struct.pszAlgId = null()
else:
hash_buffer = buffer_from_unicode(hash_constant)
padding_info_struct.pszAlgId = cast(bcrypt, 'wchar_t *', hash_buffer)
padding_info = cast(bcrypt, 'void *', padding_info_struct_pointer)
if private_key.algorithm == 'dsa' and private_key.bit_size > 1024 and hash_algorithm in set(['md5', 'sha1']):
raise ValueError(pretty_message(
'''
Windows does not support sha1 signatures with DSA keys based on
sha224, sha256 or sha512
'''
))
if key_type == 'private':
blob += fill_width(private_bytes, key_width)
key_handle_pointer = new(bcrypt, 'BCRYPT_KEY_HANDLE *')
res = bcrypt.BCryptImportKeyPair(
alg_handle,
null(),
blob_type,
key_handle_pointer,
blob,
len(blob),
BcryptConst.BCRYPT_NO_KEY_VALIDATION
)
handle_error(res)
key_handle = unwrap(key_handle_pointer)
return container(key_handle, key_object)
finally:
if alg_handle:
close_alg_handle(alg_handle)
ocsp_oid.Data = cast(Security, 'char *', ocsp_oid_buffer)
ocsp_search_ref_pointer = new(Security, 'SecPolicySearchRef *')
result = Security.SecPolicySearchCreate(
SecurityConst.CSSM_CERT_X_509v3,
ocsp_oid_pointer,
null(),
ocsp_search_ref_pointer
)
handle_sec_error(result)
ocsp_search_ref = unwrap(ocsp_search_ref_pointer)
ocsp_policy_ref_pointer = new(Security, 'SecPolicyRef *')
result = Security.SecPolicySearchCopyNext(ocsp_search_ref, ocsp_policy_ref_pointer)
handle_sec_error(result)
ocsp_policy_ref = unwrap(ocsp_policy_ref_pointer)
ocsp_struct_pointer = struct(Security, 'CSSM_APPLE_TP_OCSP_OPTIONS')
ocsp_struct = unwrap(ocsp_struct_pointer)
ocsp_struct.Version = SecurityConst.CSSM_APPLE_TP_OCSP_OPTS_VERSION
ocsp_struct.Flags = (
SecurityConst.CSSM_TP_ACTION_OCSP_DISABLE_NET |
SecurityConst.CSSM_TP_ACTION_OCSP_CACHE_READ_DISABLE
)
ocsp_struct_bytes = struct_bytes(ocsp_struct_pointer)
cssm_data_pointer = struct(Security, 'CSSM_DATA')
cssm_data = unwrap(cssm_data_pointer)
cssm_data.Length = len(ocsp_struct_bytes)
ocsp_struct_buffer = buffer_from_bytes(ocsp_struct_bytes)
cssm_data.Data = cast(Security, 'char *', ocsp_struct_buffer)
alg_constant = {
'aes': BcryptConst.BCRYPT_AES_ALGORITHM,
'des': BcryptConst.BCRYPT_DES_ALGORITHM,
'tripledes_2key': BcryptConst.BCRYPT_3DES_112_ALGORITHM,
'tripledes_3key': BcryptConst.BCRYPT_3DES_ALGORITHM,
'rc2': BcryptConst.BCRYPT_RC2_ALGORITHM,
'rc4': BcryptConst.BCRYPT_RC4_ALGORITHM,
}[cipher]
try:
alg_handle = open_alg_handle(alg_constant)
blob_type = BcryptConst.BCRYPT_KEY_DATA_BLOB
blob_struct_pointer = struct(bcrypt, 'BCRYPT_KEY_DATA_BLOB_HEADER')
blob_struct = unwrap(blob_struct_pointer)
blob_struct.dwMagic = BcryptConst.BCRYPT_KEY_DATA_BLOB_MAGIC
blob_struct.dwVersion = BcryptConst.BCRYPT_KEY_DATA_BLOB_VERSION1
blob_struct.cbKeyData = len(key)
blob = struct_bytes(blob_struct_pointer) + key
if cipher == 'rc2':
buf = new(bcrypt, 'DWORD *', len(key) * 8)
res = bcrypt.BCryptSetProperty(
alg_handle,
BcryptConst.BCRYPT_EFFECTIVE_KEY_LENGTH,
buf,
4,
0
)
handle_error(res)
result = Security.SecPolicySearchCreate(
SecurityConst.CSSM_CERT_X_509v3,
ocsp_oid_pointer,
null(),
ocsp_search_ref_pointer
)
handle_sec_error(result)
ocsp_search_ref = unwrap(ocsp_search_ref_pointer)
ocsp_policy_ref_pointer = new(Security, 'SecPolicyRef *')
result = Security.SecPolicySearchCopyNext(ocsp_search_ref, ocsp_policy_ref_pointer)
handle_sec_error(result)
ocsp_policy_ref = unwrap(ocsp_policy_ref_pointer)
ocsp_struct_pointer = struct(Security, 'CSSM_APPLE_TP_OCSP_OPTIONS')
ocsp_struct = unwrap(ocsp_struct_pointer)
ocsp_struct.Version = SecurityConst.CSSM_APPLE_TP_OCSP_OPTS_VERSION
ocsp_struct.Flags = (
SecurityConst.CSSM_TP_ACTION_OCSP_DISABLE_NET |
SecurityConst.CSSM_TP_ACTION_OCSP_CACHE_READ_DISABLE
)
ocsp_struct_bytes = struct_bytes(ocsp_struct_pointer)
cssm_data_pointer = struct(Security, 'CSSM_DATA')
cssm_data = unwrap(cssm_data_pointer)
cssm_data.Length = len(ocsp_struct_bytes)
ocsp_struct_buffer = buffer_from_bytes(ocsp_struct_bytes)
cssm_data.Data = cast(Security, 'char *', ocsp_struct_buffer)
result = Security.SecPolicySetValue(ocsp_policy_ref, cssm_data_pointer)
handle_sec_error(result)
handle_error(res)
private_buffer_length = deref(private_out_len)
private_buffer = buffer_from_bytes(private_buffer_length)
res = bcrypt.BCryptExportKey(
key_handle,
null(),
private_blob_type,
private_buffer,
private_buffer_length,
private_out_len,
0
)
handle_error(res)
private_blob_struct_pointer = struct_from_buffer(bcrypt, struct_type, private_buffer)
private_blob_struct = unwrap(private_blob_struct_pointer)
struct_size = sizeof(bcrypt, private_blob_struct)
private_blob = bytes_from_buffer(private_buffer, private_buffer_length)[struct_size:]
if algorithm == 'rsa':
private_key = _bcrypt_interpret_rsa_key_blob('private', private_blob_struct, private_blob)
elif algorithm == 'dsa':
if bit_size > 1024:
private_key = _bcrypt_interpret_dsa_key_blob('private', 2, private_blob_struct, private_blob)
else:
private_key = _bcrypt_interpret_dsa_key_blob('private', 1, private_blob_struct, private_blob)
else:
private_key = _bcrypt_interpret_ec_key_blob('private', private_blob_struct, private_blob)
public_out_len = new(bcrypt, 'ULONG *')
res = bcrypt.BCryptExportKey(key_handle, null(), public_blob_type, null(), 0, public_out_len, 0)
handle_error(res)
"""
Reads end-entity and intermediate certificate information from the
TLS session
"""
cert_context_pointer_pointer = new(crypt32, 'CERT_CONTEXT **')
result = secur32.QueryContextAttributesW(
self._context_handle_pointer,
Secur32Const.SECPKG_ATTR_REMOTE_CERT_CONTEXT,
cert_context_pointer_pointer
)
handle_error(result, TLSError)
cert_context_pointer = unwrap(cert_context_pointer_pointer)
cert_context_pointer = cast(crypt32, 'CERT_CONTEXT *', cert_context_pointer)
cert_context = unwrap(cert_context_pointer)
cert_data = bytes_from_buffer(cert_context.pbCertEncoded, native(int, cert_context.cbCertEncoded))
self._certificate = Asn1Certificate.load(cert_data)
self._intermediates = []
store_handle = None
try:
store_handle = cert_context.hCertStore
context_pointer = crypt32.CertEnumCertificatesInStore(store_handle, null())
while not is_null(context_pointer):
context = unwrap(context_pointer)
data = bytes_from_buffer(context.pbCertEncoded, native(int, context.cbCertEncoded))
# The cert store seems to include the end-entity certificate as
# the last entry, but we already have that from the struct.
if data != cert_data: