Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# the default, and what is used by Security.framework on OS X also.
g = 2
try:
byte_size = bit_size // 8
if _backend == 'win':
alg_handle = open_alg_handle(BcryptConst.BCRYPT_RNG_ALGORITHM)
buffer = buffer_from_bytes(byte_size)
while True:
if _backend == 'winlegacy':
rb = os.urandom(byte_size)
else:
res = bcrypt.BCryptGenRandom(alg_handle, buffer, byte_size, 0)
handle_error(res)
rb = bytes_from_buffer(buffer)
p = int_from_bytes(rb)
# If a number is even, it can't be prime
if p % 2 == 0:
continue
# Perform the generator checks outlined in OpenSSL's
# dh_builtin_genparams() located in dh_gen.c
if g == 2:
if p % 24 != 11:
continue
elif g == 5:
rem = p % 10
if rem != 3 and rem != 7:
continue
ciphertext = ciphertext[::-1]
buffer = buffer_from_bytes(ciphertext)
out_len = new(advapi32, 'DWORD *', len(ciphertext))
res = advapi32.CryptDecrypt(
private_key.ex_key_handle,
null(),
True,
flags,
buffer,
out_len
)
handle_error(res)
return bytes_from_buffer(buffer, deref(out_len))
if padding is not None:
res = libcrypto.EVP_CIPHER_CTX_set_padding(evp_cipher_ctx, int(padding))
handle_openssl_error(res)
buffer = buffer_from_bytes(buffer_size)
output_length = new(libcrypto, 'int *')
res = libcrypto.EVP_EncryptUpdate(evp_cipher_ctx, buffer, output_length, data, len(data))
handle_openssl_error(res)
output = bytes_from_buffer(buffer, deref(output_length))
res = libcrypto.EVP_EncryptFinal_ex(evp_cipher_ctx, buffer, output_length)
handle_openssl_error(res)
output += bytes_from_buffer(buffer, deref(output_length))
return output
finally:
if evp_cipher_ctx:
libcrypto.EVP_CIPHER_CTX_free(evp_cipher_ctx)
evp_cipher = null()
res = libcrypto.EVP_EncryptInit_ex(evp_cipher_ctx, evp_cipher, null(), key, iv)
handle_openssl_error(res)
if padding is not None:
res = libcrypto.EVP_CIPHER_CTX_set_padding(evp_cipher_ctx, int(padding))
handle_openssl_error(res)
buffer = buffer_from_bytes(buffer_size)
output_length = new(libcrypto, 'int *')
res = libcrypto.EVP_EncryptUpdate(evp_cipher_ctx, buffer, output_length, data, len(data))
handle_openssl_error(res)
output = bytes_from_buffer(buffer, deref(output_length))
res = libcrypto.EVP_EncryptFinal_ex(evp_cipher_ctx, buffer, output_length)
handle_openssl_error(res)
output += bytes_from_buffer(buffer, deref(output_length))
return output
finally:
if evp_cipher_ctx:
libcrypto.EVP_CIPHER_CTX_free(evp_cipher_ctx)
self._handshake(renegotiate=True)
return self.read(max_length)
elif result != Secur32Const.SEC_E_OK:
handle_error(result, TLSError)
valid_buffer_types = set([
Secur32Const.SECBUFFER_EMPTY,
Secur32Const.SECBUFFER_STREAM_HEADER,
Secur32Const.SECBUFFER_STREAM_TRAILER
])
extra_amount = None
for buf in (buf0, buf1, buf2, buf3):
buffer_type = buf.BufferType
if buffer_type == Secur32Const.SECBUFFER_DATA:
output += bytes_from_buffer(buf.pvBuffer, buf.cbBuffer)
output_len = len(output)
elif buffer_type == Secur32Const.SECBUFFER_EXTRA:
extra_amount = native(int, buf.cbBuffer)
elif buffer_type not in valid_buffer_types:
raise OSError(pretty_message(
'''
Unexpected decrypt output buffer of type %s
''',
buffer_type
))
if extra_amount:
self._received_bytes = self._received_bytes[data_len - extra_amount:]
else:
self._received_bytes = self._received_bytes[data_len:]
rsa = libcrypto.EVP_PKEY_get1_RSA(certificate_or_public_key.evp_pkey)
if is_null(rsa):
handle_openssl_error(0)
buffer_size = libcrypto.EVP_PKEY_size(certificate_or_public_key.evp_pkey)
decrypted_buffer = buffer_from_bytes(buffer_size)
decrypted_length = libcrypto.RSA_public_decrypt(
len(signature),
signature,
decrypted_buffer,
rsa,
LibcryptoConst.RSA_PKCS1_PADDING
)
handle_openssl_error(decrypted_length)
decrypted_bytes = bytes_from_buffer(decrypted_buffer, decrypted_length)
if not constant_compare(data, decrypted_bytes):
raise SignatureError('Signature is invalid')
return
finally:
if rsa:
libcrypto.RSA_free(rsa)
evp_md_ctx = None
rsa = None
dsa = None
dsa_sig = None
ec_key = None
ecdsa_sig = None
res = bcrypt.BCryptEncrypt(
certificate_or_public_key.key_handle,
data,
len(data),
padding_info,
null(),
0,
buffer,
buffer_len,
out_len,
flags
)
handle_error(res)
return bytes_from_buffer(buffer, deref(out_len))
if is_null(rsa):
handle_openssl_error(0)
buffer_size = libcrypto.EVP_PKEY_size(private_key.evp_pkey)
signature_buffer = buffer_from_bytes(buffer_size)
signature_length = libcrypto.RSA_private_encrypt(
len(data),
data,
signature_buffer,
rsa,
LibcryptoConst.RSA_PKCS1_PADDING
)
handle_openssl_error(signature_length)
return bytes_from_buffer(signature_buffer, signature_length)
finally:
if rsa:
libcrypto.RSA_free(rsa)
evp_md_ctx = None
rsa = None
dsa = None
dsa_sig = None
ec_key = None
ecdsa_sig = None
try:
if libcrypto_version_info < (1, 1):
evp_md_ctx = libcrypto.EVP_MD_CTX_create()
else:
raise_disconnection()
elif error == LibsslConst.SSL_ERROR_WANT_WRITE:
self._raw_write()
again = True
continue
elif error == LibsslConst.SSL_ERROR_ZERO_RETURN:
self._gracefully_closed = True
self._shutdown(False)
break
else:
handle_openssl_error(0, TLSError)
output += bytes_from_buffer(self._read_buffer, result)
if self._gracefully_closed and len(output) == 0:
self._raise_closed()
self._decrypted_bytes = output[max_length:]
return output[0:max_length]
encoded_data = add_pss_padding(hash_algorithm, hash_length, private_key.bit_size, data)
key_length = private_key.byte_size
buffer = buffer_from_bytes(key_length)
output_length = new(Security, 'size_t *', key_length)
result = Security.SecKeyDecrypt(
private_key.sec_key_ref,
SecurityConst.kSecPaddingNone,
encoded_data,
len(encoded_data),
buffer,
output_length
)
handle_sec_error(result)
return bytes_from_buffer(buffer, deref(output_length))