Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@url(r'/api/plugins/pypi/list')
@endpoint(api=True)
def handle_api_pypi_list(self, http_context):
r = {}
for l in subprocess.check_output(['pip', 'freeze']).splitlines():
if l:
package = l.decode().split('=')[0]
if package:
prefix = 'ajenti.plugin.'
if package.startswith(prefix):
name = package[len(prefix):]
r[name] = package
return r
@url(r'/api/network/downup/(?P.+)')
@authorize('network:updown')
@endpoint(api=True)
def handle_api_downup(self, http_context, iface=None):
self.manager.down(iface)
self.manager.up(iface)
@url(r'/api/filesystem/create-directory/(?P
@url(r'/api/augeas/endpoint/set/(?P.+)')
@endpoint(api=True)
def handle_api_set(self, http_context, id=None):
data = json.loads(http_context.body)
ep = self.__get_augeas_endpoint(id)
if not ep:
raise EndpointReturn(404)
aug = ep.get_augeas()
aug.load()
def __apply_tree(e):
aug.set(e['path'], e['value'])
for sp in aug.match(e['path'] + '/*'):
aug.remove(sp)
for child in e['children']:
@url('/api/terminal/create')
@authorize('terminal:open')
@endpoint(api=True)
def handle_create(self, http_context):
options = http_context.json_body()
return self.mgr.create(**options)
@url(r'/api/core/user-config')
@endpoint(api=True)
def handle_api_user_config(self, http_context):
if http_context.method == 'GET':
return UserConfig.get(self.context).data
if http_context.method == 'POST':
data = json.loads(http_context.body)
config = UserConfig.get(self.context)
config.data.update(data)
config.save()
@url(r'/api/network/config/get')
@endpoint(api=True)
def handle_api_config_get(self, http_context):
return self.manager.get_config()
@url(r'/api/settings/generate-client-certificate')
@endpoint(api=True)
def handle_api_generate_client_certificate(self, http_context):
data = json.loads(http_context.body)
key = PKey()
key.generate_key(TYPE_RSA, 4096)
ca_key = load_privatekey(FILETYPE_PEM, open(aj.config.data['ssl']['certificate']).read())
ca_cert = load_certificate(FILETYPE_PEM, open(aj.config.data['ssl']['certificate']).read())
cert = X509()
cert.get_subject().countryName = data['c']
cert.get_subject().stateOrProvinceName = data['st']
cert.get_subject().organizationName = data['o']
cert.get_subject().commonName = data['cn']
cert.set_pubkey(key)
cert.set_serial_number(random.getrandbits(8 * 20))
cert.gmtime_adj_notBefore(0)
@url(r'/api/plugins/pypi/install/(?P.+)/(?P.+)')
@endpoint(api=True)
def handle_api_pypi_install(self, http_context, name=None, version=None):
# TODO replaced with a task
try:
subprocess.call(['pip', 'install', 'ajenti.plugin.%s==%s' % (name, version)])
except subprocess.CalledProcessError as e:
raise EndpointError(e.output)
@url(r'/api/datetime/tz/set/(?P.+)')
@endpoint(api=True)
def handle_api_tz_set(self, http_context, tz=None):
return self.manager.set_tz(tz)