Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sec_transform = None
try:
cf_dict = CFHelpers.cf_dictionary_from_pairs([(Security.kSecAttrKeyType, cipher)])
cf_key = CFHelpers.cf_data_from_bytes(key)
cf_data = CFHelpers.cf_data_from_bytes(data)
error_pointer = new(CoreFoundation, 'CFErrorRef *')
sec_key = Security.SecKeyCreateFromData(cf_dict, cf_key, error_pointer)
handle_cf_error(error_pointer)
sec_transform = Security.SecEncryptTransformCreate(sec_key, error_pointer)
handle_cf_error(error_pointer)
if cipher != Security.kSecAttrKeyTypeRC4:
Security.SecTransformSetAttribute(sec_transform, Security.kSecModeCBCKey, null(), error_pointer)
handle_cf_error(error_pointer)
Security.SecTransformSetAttribute(sec_transform, Security.kSecPaddingKey, padding, error_pointer)
handle_cf_error(error_pointer)
cf_iv = CFHelpers.cf_data_from_bytes(iv)
Security.SecTransformSetAttribute(sec_transform, Security.kSecIVKey, cf_iv, error_pointer)
handle_cf_error(error_pointer)
Security.SecTransformSetAttribute(
sec_transform,
Security.kSecTransformInputAttributeName,
cf_data,
error_pointer
)
handle_cf_error(error_pointer)
if is_null(stack_pointer):
handle_openssl_error(0, TLSError)
if libcrypto_version_info < (1, 1):
number_certs = libssl.sk_num(stack_pointer)
else:
number_certs = libssl.OPENSSL_sk_num(stack_pointer)
self._intermediates = []
for index in range(0, number_certs):
if libcrypto_version_info < (1, 1):
x509_ = libssl.sk_value(stack_pointer, index)
else:
x509_ = libssl.OPENSSL_sk_value(stack_pointer, index)
buffer_size = libcrypto.i2d_X509(x509_, null())
cert_buffer = buffer_from_bytes(buffer_size)
cert_pointer = buffer_pointer(cert_buffer)
cert_length = libcrypto.i2d_X509(x509_, cert_pointer)
handle_openssl_error(cert_length)
cert_data = bytes_from_buffer(cert_buffer, cert_length)
cert = Asn1Certificate.load(cert_data)
if index == 0:
self._certificate = cert
else:
self._intermediates.append(cert)
schannel_cred.dwMaximumCipherStrength = 0
# Default session lifetime is 10 hours
schannel_cred.dwSessionLifespan = 0
schannel_cred.dwFlags = flags
schannel_cred.dwCredFormat = 0
cred_handle_pointer = new(secur32, 'CredHandle *')
result = secur32.AcquireCredentialsHandleW(
null(),
Secur32Const.UNISP_NAME,
Secur32Const.SECPKG_CRED_OUTBOUND,
null(),
schannel_cred_pointer,
null(),
null(),
cred_handle_pointer,
null()
)
handle_error(result)
self._credentials_handle = cred_handle_pointer
if iv is None:
iv_len = 0
else:
iv_len = len(iv)
flags = 0
if padding is True:
flags = BcryptConst.BCRYPT_BLOCK_PADDING
out_len = new(bcrypt, 'ULONG *')
res = bcrypt.BCryptEncrypt(
key_handle,
data,
len(data),
null(),
null(),
0,
null(),
0,
out_len,
flags
)
handle_error(res)
buffer_len = deref(out_len)
buffer = buffer_from_bytes(buffer_len)
iv_buffer = buffer_from_bytes(iv) if iv else null()
res = bcrypt.BCryptEncrypt(
key_handle,
data,
handshake_server_bytes += bytes_read
self._received_bytes += bytes_read
in_buffers[0].cbBuffer = len(self._received_bytes)
write_to_buffer(in_data_buffer, self._received_bytes)
result = secur32.InitializeSecurityContextW(
self._session._credentials_handle,
temp_context_handle_pointer,
self._hostname,
self._context_flags,
0,
0,
in_sec_buffer_desc_pointer,
0,
null(),
out_sec_buffer_desc_pointer,
output_context_flags_pointer,
null()
)
if result == Secur32Const.SEC_E_INCOMPLETE_MESSAGE:
in_buffers[0].BufferType = Secur32Const.SECBUFFER_TOKEN
# Windows 10 seems to fill the second input buffer with
# a BufferType of SECBUFFER_MISSING (4), which if not
# cleared causes the handshake to fail.
if in_buffers[1].BufferType != Secur32Const.SECBUFFER_EMPTY:
in_buffers[1].BufferType = Secur32Const.SECBUFFER_EMPTY
in_buffers[1].cbBuffer = 0
if not is_null(in_buffers[1].pvBuffer):
secur32.FreeContextBuffer(in_buffers[1].pvBuffer)
in_buffers[1].pvBuffer = null()
if certificate_or_public_key.algorithm == 'rsa':
if rsa_pss_padding:
flags = BcryptConst.BCRYPT_PAD_PSS
padding_info_struct_pointer = struct(bcrypt, 'BCRYPT_PSS_PADDING_INFO')
padding_info_struct = unwrap(padding_info_struct_pointer)
# This has to be assigned to a variable to prevent cffi from gc'ing it
hash_buffer = buffer_from_unicode(hash_constant)
padding_info_struct.pszAlgId = cast(bcrypt, 'wchar_t *', hash_buffer)
padding_info_struct.cbSalt = len(digest)
else:
flags = BcryptConst.BCRYPT_PAD_PKCS1
padding_info_struct_pointer = struct(bcrypt, 'BCRYPT_PKCS1_PADDING_INFO')
padding_info_struct = unwrap(padding_info_struct_pointer)
# This has to be assigned to a variable to prevent cffi from gc'ing it
if hash_algorithm == 'raw':
padding_info_struct.pszAlgId = null()
else:
hash_buffer = buffer_from_unicode(hash_constant)
padding_info_struct.pszAlgId = cast(bcrypt, 'wchar_t *', hash_buffer)
padding_info = cast(bcrypt, 'void *', padding_info_struct_pointer)
else:
# Windows doesn't use the ASN.1 Sequence for DSA/ECDSA signatures,
# so we have to convert it here for the verification to work
try:
signature = DSASignature.load(signature).to_p1363()
except (ValueError, OverflowError, TypeError):
raise SignatureError('Signature is invalid')
res = bcrypt.BCryptVerifySignature(
certificate_or_public_key.key_handle,
padding_info,
digest,
format_pointer = new(Security, 'uint32_t *')
pointer_set(format_pointer, SecurityConst.kSecFormatOpenSSL)
type_pointer = new(Security, 'uint32_t *')
pointer_set(type_pointer, item_type)
keys_pointer = new(CoreFoundation, 'CFArrayRef *')
attr_array = CFHelpers.cf_array_from_list([
Security.kSecAttrIsExtractable
])
import_export_params_pointer = new(Security, 'SecItemImportExportKeyParameters *')
import_export_params = unwrap(import_export_params_pointer)
import_export_params.version = 0
import_export_params.flags = 0
import_export_params.passphrase = null()
import_export_params.alertTitle = null()
import_export_params.alertPrompt = null()
import_export_params.accessRef = null()
import_export_params.keyUsage = null()
import_export_params.keyAttributes = attr_array
res = Security.SecItemImport(
cf_source,
null(),
format_pointer,
type_pointer,
0,
import_export_params_pointer,
null(),
keys_pointer
)
handle_sec_error(res)
def _load_x509(certificate):
"""
Loads an ASN.1 object of an x509 certificate into a Certificate object
:param certificate:
An asn1crypto.x509.Certificate object
:return:
A Certificate object
"""
source = certificate.dump()
buffer = buffer_from_bytes(source)
evp_pkey = libcrypto.d2i_X509(null(), buffer_pointer(buffer), len(source))
if is_null(evp_pkey):
handle_openssl_error(0)
return Certificate(evp_pkey, certificate)