Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handle_api_fs_write(self, http_context, path=None):
try:
content = http_context.body
if http_context.query:
encoding = http_context.query.get('encoding', None)
if encoding:
content = content.decode('utf-8').encode(encoding)
with open(path, 'w') as f:
f.write(content)
except OSError as e:
raise EndpointError(e)
def handle_api_time_sync(self, http_context):
try:
subprocess.check_call(['ntpdate', '0.pool.ntp.org'])
except Exception as e:
raise EndpointError(e)
return int(time.time())
def handle_api_repo_list(self, http_context):
if os.path.exists('/root/.cache/pip'):
shutil.rmtree('/root/.cache/pip')
try:
return requests.get('http://ajenti.org/plugins/list').json()
except Exception as e:
raise EndpointError(e)
def handle_api_fs_chmod(self, http_context, path=None):
if not os.path.exists(path):
raise EndpointReturn(404)
data = json.loads(http_context.body)
try:
os.chmod(path, data['mode'])
except OSError as e:
raise EndpointError(e)
def handle_api_time_sync(self, http_context):
if subprocess.call(['which', 'ntpdate']) != 0:
raise EndpointError(_('ntpdate utility is not installed'))
try:
subprocess.check_call(['ntpdate', '0.pool.ntp.org'])
except Exception as e:
raise EndpointError(e)
return int(time.time())
@url(r'/api/services/do/(?P\w+)/(?P\w+)/(?P.+)')
@authorize('services:manage')
@endpoint(api=True)
def handle_api_operate(self, http_context, manager_id=None, operation=None, service_id=None):
if operation not in ['start', 'stop', 'restart']:
return
try:
getattr(self.managers[manager_id], operation)(service_id)
except ServiceOperationError as e:
raise EndpointError(e)
def handle_api_fs_create_directory(self, http_context, path=None):
try:
os.makedirs(path)
except OSError as e:
raise EndpointError(e)
def handle_api_time_sync(self, http_context):
if subprocess.call(['which', 'ntpdate']) != 0:
raise EndpointError(_('ntpdate utility is not installed'))
try:
subprocess.check_call(['ntpdate', '0.pool.ntp.org'])
except Exception as e:
raise EndpointError(e)
return int(time.time())
def handle_script(self, http_context):
data = http_context.json_body()
try:
p = subprocess.Popen(
['bash', '-c', data['script']],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True
)
o, e = p.communicate(data.get('input', None))
except subprocess.CalledProcessError as e:
raise EndpointError(e)
except OSError as e:
raise EndpointError(e)
return {
'code': p.returncode,
'output': o,
'stderr': e,
}