Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
data_len = len(data)
while data_len:
write_buffer = buffer_from_bytes(data)
result = Security.SSLWrite(
self._session_context,
write_buffer,
data_len,
processed_pointer
)
if self._exception is not None:
exception = self._exception
self._exception = None
raise exception
handle_sec_error(result, TLSError)
bytes_written = deref(processed_pointer)
data = data[bytes_written:]
data_len = len(data)
if data_len > 0:
self.select_write()
to_read,
processed_pointer
)
if self._exception is not None:
exception = self._exception
self._exception = None
raise exception
if result and result not in set([SecurityConst.errSSLWouldBlock, SecurityConst.errSSLClosedGraceful]):
handle_sec_error(result, TLSError)
if result and result == SecurityConst.errSSLClosedGraceful:
self._gracefully_closed = True
self._shutdown(False)
self._raise_closed()
bytes_read = deref(processed_pointer)
output = self._decrypted_bytes + bytes_from_buffer(read_buffer, bytes_read)
self._decrypted_bytes = output[max_length:]
return output[0:max_length]
handle_sec_error(result)
if disable_auto_validation:
result = Security.SSLSetSessionOption(
session_context,
SecurityConst.kSSLSessionOptionBreakOnServerAuth,
True
)
handle_sec_error(result)
# Disable all sorts of bad cipher suites
supported_ciphers_pointer = new(Security, 'size_t *')
result = Security.SSLGetNumberSupportedCiphers(session_context, supported_ciphers_pointer)
handle_sec_error(result)
supported_ciphers = deref(supported_ciphers_pointer)
cipher_buffer = buffer_from_bytes(supported_ciphers * 4)
supported_cipher_suites_pointer = cast(Security, 'uint32_t *', cipher_buffer)
result = Security.SSLGetSupportedCiphers(
session_context,
supported_cipher_suites_pointer,
supported_ciphers_pointer
)
handle_sec_error(result)
supported_ciphers = deref(supported_ciphers_pointer)
supported_cipher_suites = array_from_pointer(
Security,
'uint32_t',
supported_cipher_suites_pointer,
supported_ciphers
buffer = buffer_from_bytes(data)
out_len = new(advapi32, 'DWORD *', len(data))
res = advapi32.CryptDecrypt(
key_handle,
null(),
# To skip padding, we have to tell the API that this is not
# the final block
False if cipher == 'aes' and not padding else True,
0,
buffer,
out_len
)
handle_error(res)
return bytes_from_buffer(buffer, deref(out_len))
finally:
if key_handle:
advapi32.CryptDestroyKey(key_handle)
if context_handle:
close_context_handle(context_handle)
))
out_len = new(bcrypt, 'DWORD *')
res = bcrypt.BCryptSignHash(
private_key.key_handle,
padding_info,
digest,
len(digest),
null(),
0,
out_len,
flags
)
handle_error(res)
buffer_len = deref(out_len)
buffer = buffer_from_bytes(buffer_len)
if private_key.algorithm == 'rsa':
padding_info = cast(bcrypt, 'void *', padding_info_struct_pointer)
res = bcrypt.BCryptSignHash(
private_key.key_handle,
padding_info,
digest,
len(digest),
buffer,
buffer_len,
out_len,
flags
)
handle_error(res)
out_len = new(bcrypt, 'ULONG *')
res = bcrypt.BCryptEncrypt(
certificate_or_public_key.key_handle,
data,
len(data),
padding_info,
null(),
0,
null(),
0,
out_len,
flags
)
handle_error(res)
buffer_len = deref(out_len)
buffer = buffer_from_bytes(buffer_len)
res = bcrypt.BCryptEncrypt(
certificate_or_public_key.key_handle,
data,
len(data),
padding_info,
null(),
0,
buffer,
buffer_len,
out_len,
flags
)
handle_error(res)
buffer = buffer_from_bytes(buffer_len)
write_to_buffer(buffer, data)
pointer_set(out_len, len(data))
res = advapi32.CryptEncrypt(
certificate_or_public_key.ex_key_handle,
null(),
True,
flags,
buffer,
out_len,
buffer_len
)
handle_error(res)
return bytes_from_buffer(buffer, deref(out_len))[::-1]
res = advapi32.CryptHashData(hash_handle, data, len(data), 0)
handle_error(res)
out_len = new(advapi32, 'DWORD *')
res = advapi32.CryptSignHashW(
hash_handle,
Advapi32Const.AT_SIGNATURE,
null(),
0,
null(),
out_len
)
handle_error(res)
buffer_length = deref(out_len)
buffer_ = buffer_from_bytes(buffer_length)
res = advapi32.CryptSignHashW(
hash_handle,
Advapi32Const.AT_SIGNATURE,
null(),
0,
buffer_,
out_len
)
handle_error(res)
output = bytes_from_buffer(buffer_, deref(out_len))
# CryptoAPI outputs the signature in little endian byte order, so we
# must swap it for compatibility with other systems
res = bcrypt.BCryptEncrypt(
key_handle,
data,
len(data),
null(),
iv_buffer,
iv_len,
buffer,
buffer_len,
out_len,
flags
)
handle_error(res)
return bytes_from_buffer(buffer, deref(out_len))
finally:
if key_handle:
bcrypt.BCryptDestroyKey(key_handle)