Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _test_server(cls, targetStr, shared_settings):
"""Test connectivity to one single server."""
# Parse the target string
try:
defaultPort = cls.DEFAULT_PORTS[shared_settings['starttls']]
except KeyError:
defaultPort = cls.DEFAULT_PORTS['default']
(host, port) = TargetStringParser.parse_target_str(targetStr, defaultPort)
# First try to connect and do StartTLS if needed
sslCon = create_sslyze_connection((host, host, port, SSLV23), shared_settings)
try:
sslCon.do_pre_handshake()
ipAddr = sslCon._sock.getpeername()[0]
# Socket errors
except socket.timeout: # Host is down
raise InvalidTargetError(targetStr, cls.ERR_TIMEOUT)
except socket.gaierror:
raise InvalidTargetError(targetStr, cls.ERR_NAME_NOT_RESOLVED)
except socket.error: # Connection Refused
raise InvalidTargetError(targetStr, cls.ERR_REJECTED)
# StartTLS errors
except StartTLSError as e:
raise InvalidTargetError(targetStr, e[0])
def create_sslyze_connection(shared_settings, sslVersion=SSLV23, sslVerifyLocations=None):
"""
Utility function to create the proper SSLConnection based on what's
in the shared_settings. All plugins should use this for their SSL
connections.
"""
# Create the proper SMTP / XMPP / HTTPS / Proxy connection
timeout = shared_settings['timeout']
if shared_settings['starttls'] == 'smtp':
sslConn = SMTPConnection(sslVersion, sslVerifyLocations, timeout)
elif shared_settings['starttls'] == 'ftp':
sslConn = FTPConnection(sslVersion, sslVerifyLocations, timeout)
elif shared_settings['starttls'] == 'xmpp':
raise InvalidTargetError(targetStr, e[0])
# Other errors
except Exception as e:
raise InvalidTargetError(targetStr, '{0}: {1}'.format(str(type(e).__name__), e[0]))
finally:
sslCon.close()
# Then try to do SSL handshakes just to figure out the SSL version
# supported by the server; the plugins need to know this in advance.
# If the handshakes fail, we keep going anyway; maybe the server
# only supports exotic cipher suites
sslSupport = SSLV23
# No connection retry when testing connectivity
tweak_shared_settings = shared_settings.copy()
tweak_shared_settings['nb_retries'] = 1
for sslVersion in [TLSV1, SSLV23, SSLV3, TLSV1_2]:
sslCon = create_sslyze_connection((host, ipAddr, port, sslVersion),
tweak_shared_settings)
try:
sslCon.connect()
except:
pass
else:
sslSupport = sslVersion
break
finally:
sslCon.close()
raise InvalidTargetError(targetStr, '{0}: {1}'.format(str(type(e).__name__), e[0]))
finally:
sslCon.close()
# Then try to do SSL handshakes just to figure out the SSL version
# supported by the server; the plugins need to know this in advance.
# If the handshakes fail, we keep going anyway; maybe the server
# only supports exotic cipher suites
sslSupport = SSLV23
# No connection retry when testing connectivity
tweak_shared_settings = shared_settings.copy()
tweak_shared_settings['nb_retries'] = 1
for sslVersion in [TLSV1, SSLV23, SSLV3, TLSV1_2]:
sslCon = create_sslyze_connection((host, ipAddr, port, sslVersion),
tweak_shared_settings)
try:
sslCon.connect()
except:
pass
else:
sslSupport = sslVersion
break
finally:
sslCon.close()
return host, ipAddr, port, sslSupport
def process_task(self, target, command, args):
(host, ip, port, sslVersion) = target
if sslVersion == SSLV23: # Could not determine the preferred SSL version - client cert was required ?
sslVersion = TLSV1 # Default to TLS 1.0
target = (host, ip, port, sslVersion)
sslConn = create_sslyze_connection(target, self._shared_settings)
sslConn.sslVersion = sslVersion # Needed by the heartbleed payload
# Awful hack #1: replace nassl.sslClient.do_handshake() with a heartbleed
# checking SSL handshake so that all the SSLyze options
# (startTLS, proxy, etc.) still work
sslConn.do_handshake = new.instancemethod(do_handshake_with_heartbleed, sslConn, None)
heartbleed = None
try: # Perform the SSL handshake
sslConn.connect()
except HeartbleedSent:
# Awful hack #2: directly read the underlying network socket
# Let's try to open the cert and key files
if args_command_list.cert:
try:
open(args_command_list.cert,"r")
except:
raise CommandLineParsingError('Could not open the client certificate file "' + str(args_command_list.cert) + '".')
if args_command_list.key:
try:
open(args_command_list.key,"r")
except:
raise CommandLineParsingError('Could not open the client private key file "' + str(args_command_list.key) + '"')
# Try to load the cert and key in OpenSSL
try:
sslClient = SslClient()
sslClient.use_private_key(args_command_list.cert,
args_command_list.certform,
args_command_list.key,
args_command_list.keyform,
args_command_list.keypass)
except _nassl.OpenSSLError as e:
if 'bad decrypt' in str(e.args):
raise CommandLineParsingError('Could not decrypt the private key. Wrong passphrase ?')
raise CommandLineParsingError('Could not load the certificate or the private key. Passphrase needed ?')
# HTTP CONNECT proxy
shared_settings['https_tunnel_host'] = None
if args_command_list.https_tunnel:
def process_task(self, target, command, args):
MAX_THREADS = 30
sslVersionDict = {'sslv2': SSLV2,
'sslv3': SSLV3,
'tlsv1': TLSV1,
'tlsv1_1': TLSV1_1,
'tlsv1_2': TLSV1_2}
try:
sslVersion = sslVersionDict[command]
except KeyError:
raise Exception("PluginOpenSSLCipherSuites: Unknown command.")
# Get the list of available cipher suites for the given ssl version
sslClient = SslClient(sslVersion=sslVersion)
sslClient.set_cipher_list('ALL:COMPLEMENTOFALL')
cipher_list = sslClient.get_cipher_list()
# Create a thread pool
NB_THREADS = min(len(cipher_list), MAX_THREADS) # One thread per cipher
thread_pool = ThreadPool()
# Scan for every available cipher suite
for cipher in cipher_list:
thread_pool.add_job((self._test_ciphersuite,
(target, sslVersion, cipher)))
# Scan for the preferred cipher suite
thread_pool.add_job((self._pref_ciphersuite,
(target, sslVersion)))
def process_task(self, target, command, args):
MAX_THREADS = 15
sslVersionDict = {'sslv2': SSLV2,
'sslv3': SSLV3,
'tlsv1': TLSV1,
'tlsv1_1': TLSV1_1,
'tlsv1_2': TLSV1_2}
try:
sslVersion = sslVersionDict[command]
except KeyError:
raise Exception("PluginOpenSSLCipherSuites: Unknown command.")
# Get the list of available cipher suites for the given ssl version
sslClient = SslClient(sslVersion=sslVersion)
sslClient.set_cipher_list('ALL:COMPLEMENTOFALL')
cipher_list = sslClient.get_cipher_list()
# Create a thread pool
NB_THREADS = min(len(cipher_list), MAX_THREADS) # One thread per cipher
thread_pool = ThreadPool()
# Scan for every available cipher suite
for cipher in cipher_list:
thread_pool.add_job((self._test_ciphersuite,
(target, sslVersion, cipher)))
# Scan for the preferred cipher suite
thread_pool.add_job((self._pref_ciphersuite,
(target, sslVersion)))
ERROR_MSG = 'Server requested a client certificate signed by one of the ' +\
'following CAs: {0}; use the --cert and --key options.'
def __init__(self, caList):
self.caList = caList
def __str__(self):
caListStr = ''
for ca in self.caList:
caListStr += ca + ' '
return self.ERROR_MSG.format(caListStr)
class SSLConnection(SslClient):
"""Base SSL connection class."""
# The following errors mean that the server explicitly rejected the
# handshake. The goal to differentiate rejected handshakes from random
# network errors such as the server going offline, etc.
HANDSHAKE_REJECTED_SOCKET_ERRORS = \
{'was forcibly closed' : 'Received FIN',
'reset by peer' : 'Received RST'}
HANDSHAKE_REJECTED_SSL_ERRORS = \
{'sslv3 alert handshake failure' : 'Alert handshake failure',
'no ciphers available' : 'No ciphers available',
'excessive message size' : 'Excessive message size',
'bad mac decode' : 'Bad mac decode',
'wrong version number' : 'Wrong version number',
'no cipher match' : 'No cipher match',
def process_task(self, target, command, args):
MAX_THREADS = 15
sslVersionDict = {'sslv2': SSLV2,
'sslv3': SSLV3,
'tlsv1': TLSV1,
'tlsv1_1': TLSV1_1,
'tlsv1_2': TLSV1_2}
try:
sslVersion = sslVersionDict[command]
except KeyError:
raise Exception("PluginOpenSSLCipherSuites: Unknown command.")
# Get the list of available cipher suites for the given ssl version
sslClient = SslClient(sslVersion=sslVersion)
sslClient.set_cipher_list('ALL:COMPLEMENTOFALL')
cipher_list = sslClient.get_cipher_list()
# Create a thread pool
NB_THREADS = min(len(cipher_list), MAX_THREADS) # One thread per cipher
thread_pool = ThreadPool()