Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Let's try to open the cert and key files
if args_command_list.cert:
try:
open(args_command_list.cert,"r")
except:
raise CommandLineParsingError('Could not open the client certificate file "' + str(args_command_list.cert) + '".')
if args_command_list.key:
try:
open(args_command_list.key,"r")
except:
raise CommandLineParsingError('Could not open the client private key file "' + str(args_command_list.key) + '"')
# Try to load the cert and key in OpenSSL
try:
sslClient = SslClient()
sslClient.use_private_key(args_command_list.cert,
args_command_list.certform,
args_command_list.key,
args_command_list.keyform,
args_command_list.keypass)
except _nassl.OpenSSLError as e:
if 'bad decrypt' in str(e.args):
raise CommandLineParsingError('Could not decrypt the private key. Wrong passphrase ?')
raise CommandLineParsingError('Could not load the certificate or the private key. Passphrase needed ?')
# HTTP CONNECT proxy
shared_settings['https_tunnel_host'] = None
if args_command_list.https_tunnel:
def process_task(self, target, command, args):
MAX_THREADS = 30
sslVersionDict = {'sslv2': SSLV2,
'sslv3': SSLV3,
'tlsv1': TLSV1,
'tlsv1_1': TLSV1_1,
'tlsv1_2': TLSV1_2}
try:
sslVersion = sslVersionDict[command]
except KeyError:
raise Exception("PluginOpenSSLCipherSuites: Unknown command.")
# Get the list of available cipher suites for the given ssl version
sslClient = SslClient(sslVersion=sslVersion)
sslClient.set_cipher_list('ALL:COMPLEMENTOFALL')
cipher_list = sslClient.get_cipher_list()
# Create a thread pool
NB_THREADS = min(len(cipher_list), MAX_THREADS) # One thread per cipher
thread_pool = ThreadPool()
# Scan for every available cipher suite
for cipher in cipher_list:
thread_pool.add_job((self._test_ciphersuite,
(target, sslVersion, cipher)))
# Scan for the preferred cipher suite
thread_pool.add_job((self._pref_ciphersuite,
(target, sslVersion)))
def process_task(self, target, command, args):
MAX_THREADS = 15
sslVersionDict = {'sslv2': SSLV2,
'sslv3': SSLV3,
'tlsv1': TLSV1,
'tlsv1_1': TLSV1_1,
'tlsv1_2': TLSV1_2}
try:
sslVersion = sslVersionDict[command]
except KeyError:
raise Exception("PluginOpenSSLCipherSuites: Unknown command.")
# Get the list of available cipher suites for the given ssl version
sslClient = SslClient(sslVersion=sslVersion)
sslClient.set_cipher_list('ALL:COMPLEMENTOFALL')
cipher_list = sslClient.get_cipher_list()
# Create a thread pool
NB_THREADS = min(len(cipher_list), MAX_THREADS) # One thread per cipher
thread_pool = ThreadPool()
# Scan for every available cipher suite
for cipher in cipher_list:
thread_pool.add_job((self._test_ciphersuite,
(target, sslVersion, cipher)))
# Scan for the preferred cipher suite
thread_pool.add_job((self._pref_ciphersuite,
(target, sslVersion)))
ERROR_MSG = 'Server requested a client certificate signed by one of the ' +\
'following CAs: {0}; use the --cert and --key options.'
def __init__(self, caList):
self.caList = caList
def __str__(self):
caListStr = ''
for ca in self.caList:
caListStr += ca + ' '
return self.ERROR_MSG.format(caListStr)
class SSLConnection(SslClient):
"""Base SSL connection class."""
# The following errors mean that the server explicitly rejected the
# handshake. The goal to differentiate rejected handshakes from random
# network errors such as the server going offline, etc.
HANDSHAKE_REJECTED_SOCKET_ERRORS = \
{'was forcibly closed' : 'Received FIN',
'reset by peer' : 'Received RST'}
HANDSHAKE_REJECTED_SSL_ERRORS = \
{'sslv3 alert handshake failure' : 'Alert handshake failure',
'no ciphers available' : 'No ciphers available',
'excessive message size' : 'Excessive message size',
'bad mac decode' : 'Bad mac decode',
'wrong version number' : 'Wrong version number',
'no cipher match' : 'No cipher match',