Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
logging.warn('No plugins were loaded!')
if aj.config.data['bind']['mode'] == 'unix':
path = aj.config.data['bind']['socket']
if os.path.exists(path):
os.unlink(path)
listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
listener.bind(path)
except OSError:
logging.error('Could not bind to %s', path)
sys.exit(1)
if aj.config.data['bind']['mode'] == 'tcp':
host = aj.config.data['bind']['host']
port = aj.config.data['bind']['port']
listener = socket.socket(
socket.AF_INET6 if ':' in host else socket.AF_INET, socket.SOCK_STREAM
)
if aj.platform not in ['freebsd', 'osx']:
try:
listener.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
except socket.error:
logging.warn('Could not set TCP_CORK')
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
logging.info('Binding to [%s]:%s', host, port)
try:
listener.bind((host, port))
except socket.error as e:
logging.error('Could not bind: %s', str(e))
sys.exit(1)
def handle_api_identity(self, http_context):
return {
'identity': {
'user': AuthenticationService.get(self.context).get_identity(),
'uid': os.getuid(),
'effective': os.geteuid(),
'elevation_allowed': aj.config.data['auth'].get('allow_sudo', False),
'profile': AuthenticationService.get(self.context).get_provider().get_profile(
AuthenticationService.get(self.context).get_identity()
),
},
'machine': {
'name': aj.config.data['name'],
'hostname': socket.gethostname(),
},
'color': aj.config.data.get('color', None),
}
def handle_api_generate_client_certificate(self, http_context):
data = json.loads(http_context.body)
key = PKey()
key.generate_key(TYPE_RSA, 4096)
ca_key = load_privatekey(FILETYPE_PEM, open(aj.config.data['ssl']['certificate']).read())
ca_cert = load_certificate(FILETYPE_PEM, open(aj.config.data['ssl']['certificate']).read())
cert = X509()
cert.get_subject().countryName = data['c']
cert.get_subject().stateOrProvinceName = data['st']
cert.get_subject().organizationName = data['o']
cert.get_subject().commonName = data['cn']
cert.set_pubkey(key)
cert.set_serial_number(random.getrandbits(8 * 20))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
cert.set_issuer(ca_cert.get_subject())
cert.sign(ca_key, 'sha1')
pkcs = PKCS12()
pkcs.set_certificate(cert)
pkcs.set_privatekey(key)
def handle_api_config(self, http_context):
if os.getuid() != 0:
raise EndpointReturn(403)
if http_context.method == 'GET':
with authorize('core:config:read'):
self.context.worker.reload_master_config()
return aj.config.data
if http_context.method == 'POST':
with authorize('core:config:write'):
data = json.loads(http_context.body)
aj.config.data.update(data)
aj.config.save()
self.context.worker.reload_master_config()
return aj.config.data
def client_certificate_callback(self, connection, x509, errno, depth, result):
if depth == 0 and (errno == 9 or errno == 10):
return False # expired / not yet valid
if not aj.config.data['ssl']['client_auth']['force']:
return True
user = ClientCertificateVerificator.get(aj.context).verify(x509)
return bool(user)
def handle_api_config(self, http_context):
if os.getuid() != 0:
raise EndpointReturn(403)
if http_context.method == 'GET':
with authorize('core:config:read'):
self.context.worker.reload_master_config()
return aj.config.data
if http_context.method == 'POST':
with authorize('core:config:write'):
data = json.loads(http_context.body)
aj.config.data.update(data)
aj.config.save()
self.context.worker.reload_master_config()
return aj.config.data
def authenticate(self, username, password):
self.context.worker.reload_master_config()
password = password.encode('utf-8')
if username in aj.config.data['auth']['users']:
hash = aj.config.data['auth']['users'][username]['password']
try:
scrypt.decrypt(hash.decode('hex'), password, maxtime=15)
return True
except scrypt.error as e:
logging.debug('Auth failed: %s' % e)
return False
return False
def get_environ(self):
env = SocketIOHandler.get_environ(self)
env['SSL'] = isinstance(self.socket, gevent.ssl.SSLSocket)
env['SSL_CLIENT_AUTH_FORCE'] = aj.config.data['ssl']['client_auth']['force']
env['SSL_CLIENT_VALID'] = False
env['SSL_CLIENT_USER'] = None
if env['SSL']:
peer_cert = self.socket.getpeercert(True)
if peer_cert:
certificate = crypto.load_certificate(crypto.FILETYPE_PEM, gevent.ssl.DER_cert_to_PEM_cert(peer_cert))
env['SSL_CLIENT_CERTIFICATE'] = certificate
if certificate:
user = ClientCertificateVerificator.get(aj.context).verify(certificate)
env['SSL_CLIENT_VALID'] = bool(user)
env['SSL_CLIENT_USER'] = user
env['SSL_CLIENT_DIGEST'] = certificate.digest('sha1')
return env
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
o, e = p.communicate()
if p.returncode != 0:
logging.error('Resource compilation failed')
logging.error(o + e)
manager = PluginManager.get(aj.context)
path = manager.get_content_path('core', 'content/pages/index.html')
content = open(path).read() % {
'prefix': http_context.prefix,
'plugins': json.dumps(
dict((manager[n]['info']['name'], manager[n]['info']['title']) for n in manager)
),
'config': json.dumps(aj.config.data),
'version': six.text_type(aj.version),
'platform': aj.platform,
'platformUnmapped': aj.platform_unmapped,
'bootstrapColor': aj.config.data.get('color', None),
}
http_context.add_header('Content-Type', 'text/html')
http_context.respond_ok()
return content
def handle_api_web_manifest(self, http_context):
return {
'short_name': aj.config.data['name'],
'name': '%s (%s)' % (aj.config.data['name'], socket.gethostname()),
'start_url': '%s/#app' % http_context.prefix,
'display': 'standalone',
'icons': [
{
'src': '%s/resources/core/resources/images/icon.png' % http_context.prefix,
'sizes': '1024x1024',
'type': 'image/png',
}