Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self):
aj.config.BaseConfig.__init__(self)
self.data = {
'bind': {
'mode': 'tcp',
'host': '0.0.0.0',
'port': 8000,
},
'color': 'blue',
'name': 'test',
'ssl': {
'enable': False
}
def send_config_data(self):
logging.debug('Sending a config update to %s', self.name)
self.stream.send({
'type': 'config-data',
'data': aj.config.data,
})
def verify(self, x509):
serial = x509.get_serial_number()
digest = x509.digest('sha1')
# logging.debug('SSL verify: %s / %s' % (x509.get_subject(), digest))
for c in aj.config.data['ssl']['client_auth']['certificates']:
if long(c['serial']) == serial and c['digest'].encode('utf-8') == digest:
return c['user']
def handle_api_generate_client_certificate(self, http_context):
data = json.loads(http_context.body)
key = PKey()
key.generate_key(TYPE_RSA, 4096)
ca_key = load_privatekey(FILETYPE_PEM, open(aj.config.data['ssl']['certificate']).read())
ca_cert = load_certificate(FILETYPE_PEM, open(aj.config.data['ssl']['certificate']).read())
cert = X509()
cert.get_subject().countryName = data['c']
cert.get_subject().stateOrProvinceName = data['st']
cert.get_subject().organizationName = data['o']
cert.get_subject().commonName = data['cn']
cert.set_pubkey(key)
cert.set_serial_number(random.getrandbits(8 * 20))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
cert.set_issuer(ca_cert.get_subject())
cert.sign(ca_key, 'sha1')
pkcs = PKCS12()
pkcs.set_certificate(cert)
pkcs.set_privatekey(key)
pkcs.set_friendlyname(str(data['cn']))
)
private_key = crypto.load_privatekey(
crypto.FILETYPE_PEM,
open(aj.config.data['ssl']['certificate']).read()
)
context.use_certificate(certificate)
context.use_privatekey(private_key)
if aj.config.data['ssl']['client_auth']['enable']:
# todo harden files
logging.info('Enabling SSL client authentication')
context.add_client_ca(certificate)
context.get_cert_store().add_cert(certificate)
verify_flags = SSL.VERIFY_PEER
if aj.config.data['ssl']['client_auth']['force']:
verify_flags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
context.set_verify(verify_flags, AuthenticationService.get(aj.context).client_certificate_callback)
context.set_verify_depth(0)
aj.server.ssl_args = {'server_side': True}
aj.server.wrap_socket = lambda socket, **ssl: SSLSocket(context, socket)
logging.info('SSL enabled')
# auth.log
try:
syslog.openlog(
ident=str(aj.product),
facility=syslog.LOG_AUTH,
)
except:
syslog.openlog(aj.product)
def __init__(self, context):
self.context = context
aj.config.data['auth'].setdefault('users', {})
def handle_api_identity(self, http_context):
return {
'identity': {
'user': AuthenticationService.get(self.context).get_identity(),
'uid': os.getuid(),
'effective': os.geteuid(),
'elevation_allowed': aj.config.data['auth'].get('allow_sudo', False),
'profile': AuthenticationService.get(self.context).get_provider().get_profile(
AuthenticationService.get(self.context).get_identity()
),
},
'machine': {
'name': aj.config.data['name'],
'hostname': socket.gethostname(),
},
'color': aj.config.data.get('color', None),
}