Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
private_key must an instance of the PrivateKey class, not %s
''',
type_name(private_key)
))
if not isinstance(ciphertext, byte_cls):
raise TypeError(pretty_message(
'''
data must be a byte string, not %s
''',
type_name(ciphertext)
))
key_length = private_key.byte_size
buffer = buffer_from_bytes(key_length)
output_length = new(Security, 'size_t *', key_length)
if osx_version_info < (10, 8):
padding = SecurityConst.kSecPaddingNone
else:
padding = SecurityConst.kSecPaddingPKCS1
result = Security.SecKeyDecrypt(
private_key.sec_key_ref,
padding,
ciphertext,
len(ciphertext),
buffer,
output_length
)
handle_sec_error(result)
signature,
len(signature)
)
# errSSLCrypto is returned in some situations on macOS 10.12
if result == SecurityConst.errSecVerifyFailed or result == SecurityConst.errSSLCrypto:
raise SignatureError('Signature is invalid')
handle_sec_error(result)
return
cf_signature = None
cf_data = None
cf_hash_length = None
sec_transform = None
try:
error_pointer = new(CoreFoundation, 'CFErrorRef *')
cf_signature = CFHelpers.cf_data_from_bytes(signature)
sec_transform = Security.SecVerifyTransformCreate(
certificate_or_public_key.sec_key_ref,
cf_signature,
error_pointer
)
handle_cf_error(error_pointer)
hash_constant = {
'md5': Security.kSecDigestMD5,
'sha1': Security.kSecDigestSHA1,
'sha224': Security.kSecDigestSHA2,
'sha256': Security.kSecDigestSHA2,
'sha384': Security.kSecDigestSHA2,
'sha512': Security.kSecDigestSHA2
}[hash_algorithm]
ValueError - when any of the parameters contain an invalid value
TypeError - when any of the parameters are of the wrong type
OSError - when an error is returned by the OS crypto library
:return:
A byte string of the plaintext
"""
flags = 0
if rsa_oaep_padding:
flags = Advapi32Const.CRYPT_OAEP
ciphertext = ciphertext[::-1]
buffer = buffer_from_bytes(ciphertext)
out_len = new(advapi32, 'DWORD *', len(ciphertext))
res = advapi32.CryptDecrypt(
private_key.ex_key_handle,
null(),
True,
flags,
buffer,
out_len
)
handle_error(res)
return bytes_from_buffer(buffer, deref(out_len))
if cipher != Security.kSecAttrKeyTypeRC4 and not padding:
raise ValueError('padding must be specified')
cf_dict = None
cf_key = None
cf_data = None
cf_iv = None
sec_key = None
sec_transform = None
try:
cf_dict = CFHelpers.cf_dictionary_from_pairs([(Security.kSecAttrKeyType, cipher)])
cf_key = CFHelpers.cf_data_from_bytes(key)
cf_data = CFHelpers.cf_data_from_bytes(data)
error_pointer = new(CoreFoundation, 'CFErrorRef *')
sec_key = Security.SecKeyCreateFromData(cf_dict, cf_key, error_pointer)
handle_cf_error(error_pointer)
sec_transform = Security.SecDecryptTransformCreate(sec_key, error_pointer)
handle_cf_error(error_pointer)
if cipher != Security.kSecAttrKeyTypeRC4:
Security.SecTransformSetAttribute(sec_transform, Security.kSecModeCBCKey, null(), error_pointer)
handle_cf_error(error_pointer)
Security.SecTransformSetAttribute(sec_transform, Security.kSecPaddingKey, padding, error_pointer)
handle_cf_error(error_pointer)
cf_iv = CFHelpers.cf_data_from_bytes(iv)
Security.SecTransformSetAttribute(sec_transform, Security.kSecIVKey, cf_iv, error_pointer)
handle_cf_error(error_pointer)
def _os_buffered_size(self):
"""
Returns the number of bytes of decrypted data stored in the Secure
Transport read buffer. This amount of data can be read from SSLRead()
without calling self._socket.recv().
:return:
An integer - the number of available bytes
"""
num_bytes_pointer = new(Security, 'size_t *')
result = Security.SSLGetBufferedReadSize(
self._session_context,
num_bytes_pointer
)
handle_sec_error(result)
return deref(num_bytes_pointer)
sec_access_ref,
public_key_pointer,
private_key_pointer
)
handle_sec_error(result)
public_key_ref = unwrap(public_key_pointer)
private_key_ref = unwrap(private_key_pointer)
cf_data_public_pointer = new(CoreFoundation, 'CFDataRef *')
result = Security.SecItemExport(public_key_ref, 0, 0, null(), cf_data_public_pointer)
handle_sec_error(result)
cf_data_public = unwrap(cf_data_public_pointer)
public_key_bytes = CFHelpers.cf_data_to_bytes(cf_data_public)
cf_data_private_pointer = new(CoreFoundation, 'CFDataRef *')
result = Security.SecItemExport(private_key_ref, 0, 0, null(), cf_data_private_pointer)
handle_sec_error(result)
cf_data_private = unwrap(cf_data_private_pointer)
private_key_bytes = CFHelpers.cf_data_to_bytes(cf_data_private)
# Clean the new keys out of the keychain
result = Security.SecKeychainItemDelete(public_key_ref)
handle_sec_error(result)
result = Security.SecKeychainItemDelete(private_key_ref)
handle_sec_error(result)
finally:
if cf_dict:
CoreFoundation.CFRelease(cf_dict)
if public_key_ref:
CoreFoundation.CFRelease(public_key_ref)
PublicKey class, not %s
''',
type_name(certificate_or_public_key)
))
if not isinstance(data, byte_cls):
raise TypeError(pretty_message(
'''
data must be a byte string, not %s
''',
type_name(data)
))
key_length = certificate_or_public_key.byte_size
buffer = buffer_from_bytes(key_length)
output_length = new(Security, 'size_t *', key_length)
result = Security.SecKeyEncrypt(
certificate_or_public_key.sec_key_ref,
SecurityConst.kSecPaddingPKCS1,
data,
len(data),
buffer,
output_length
)
handle_sec_error(result)
return bytes_from_buffer(buffer, deref(output_length))
try:
public_key_pointer = new(Security, 'SecKeyRef *')
private_key_pointer = new(Security, 'SecKeyRef *')
cf_string = CFHelpers.cf_string_from_unicode("Temporary oscrypto key")
passphrase_len = 16
rand_data = rand_bytes(10 + passphrase_len)
passphrase = rand_data[10:]
temp_filename = b32encode(rand_data[:10]).decode('utf-8')
temp_dir = tempfile.mkdtemp()
temp_path = os.path.join(temp_dir, temp_filename).encode('utf-8')
sec_keychain_ref_pointer = new(Security, 'SecKeychainRef *')
result = Security.SecKeychainCreate(
temp_path,
passphrase_len,
passphrase,
False,
null(),
sec_keychain_ref_pointer
)
handle_sec_error(result)
sec_keychain_ref = unwrap(sec_keychain_ref_pointer)
sec_access_ref_pointer = new(Security, 'SecAccessRef *')
result = Security.SecAccessCreate(cf_string, null(), sec_access_ref_pointer)
handle_sec_error(result)
sec_access_ref = unwrap(sec_access_ref_pointer)
data,
len(data),
buffer,
output_length
)
handle_sec_error(result)
return bytes_from_buffer(buffer, deref(output_length))
cf_signature = None
cf_data = None
cf_hash_length = None
sec_transform = None
try:
error_pointer = new(CoreFoundation, 'CFErrorRef *')
sec_transform = Security.SecSignTransformCreate(private_key.sec_key_ref, error_pointer)
handle_cf_error(error_pointer)
hash_constant = {
'md5': Security.kSecDigestMD5,
'sha1': Security.kSecDigestSHA1,
'sha224': Security.kSecDigestSHA2,
'sha256': Security.kSecDigestSHA2,
'sha384': Security.kSecDigestSHA2,
'sha512': Security.kSecDigestSHA2
}[hash_algorithm]
Security.SecTransformSetAttribute(
sec_transform,
Security.kSecDigestTypeAttribute,
hash_constant,
len(blob),
null(),
0,
key_handle_pointer
)
handle_error(res)
key_handle = unwrap(key_handle_pointer)
output = container(key_handle, key_object)
output.context_handle = context_handle
if algo == 'rsa':
ex_blob = _advapi32_create_blob(key_info, key_type, algo, signing=False)
ex_buffer = buffer_from_bytes(ex_blob)
ex_key_handle_pointer = new(advapi32, 'HCRYPTKEY *')
res = advapi32.CryptImportKey(
context_handle,
ex_buffer,
len(ex_blob),
null(),
0,
ex_key_handle_pointer
)
handle_error(res)
output.ex_key_handle = unwrap(ex_key_handle_pointer)
return output
except (Exception):
if key_handle: