Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
except socket.error:
logging.warn('Could not set TCP_CORK')
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
logging.info('Binding to [%s]:%s', host, port)
try:
listener.bind((host, port))
except socket.error as e:
logging.error('Could not bind: %s', str(e))
sys.exit(1)
# Fix stupid socketio bug (it tries to do *args[0][0])
socket.socket.__getitem__ = lambda x, y: None
listener.listen(10)
gateway = GateMiddleware.get(aj.context)
application = HttpRoot(HttpMiddlewareAggregator([gateway])).dispatch
aj.server = SocketIOServer(
listener,
log=open(os.devnull, 'w'),
application=application,
handler_class=RequestHandler,
policy_server=False,
transports=[
str('websocket'),
str('flashsocket'),
str('xhr-polling'),
str('jsonp-polling'),
],
)
def init(plugin_manager):
import aj
api.TZManager.any(aj.context)
from .main import ItemProvider
from .views import Handler
def client_certificate_callback(self, connection, x509, errno, depth, result):
if depth == 0 and (errno == 9 or errno == 10):
return False # expired / not yet valid
if not aj.config.data['ssl']['client_auth']['force']:
return True
user = ClientCertificateVerificator.get(aj.context).verify(x509)
return bool(user)
def get_environ(self):
env = SocketIOHandler.get_environ(self)
env['SSL'] = isinstance(self.socket, gevent.ssl.SSLSocket)
env['SSL_CLIENT_AUTH_FORCE'] = aj.config.data['ssl']['client_auth']['force']
env['SSL_CLIENT_VALID'] = False
env['SSL_CLIENT_USER'] = None
if env['SSL']:
peer_cert = self.socket.getpeercert(True)
if peer_cert:
certificate = crypto.load_certificate(crypto.FILETYPE_PEM, gevent.ssl.DER_cert_to_PEM_cert(peer_cert))
env['SSL_CLIENT_CERTIFICATE'] = certificate
if certificate:
user = ClientCertificateVerificator.get(aj.context).verify(certificate)
env['SSL_CLIENT_VALID'] = bool(user)
env['SSL_CLIENT_USER'] = user
env['SSL_CLIENT_DIGEST'] = certificate.digest('sha1')
return env