Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import subprocess
from jadi import component
from aj.plugins.services.api import ServiceManager, Service
@component(ServiceManager)
class SystemdServiceManager(ServiceManager):
id = 'systemd'
name = 'systemd'
@classmethod
def __verify__(cls):
return subprocess.call(['which', 'systemctl']) == 0
def __init__(self, context):
pass
def list(self, units=None):
if not units:
units = [x.split()[0] for x in subprocess.check_output(['systemctl', 'list-unit-files', '--no-legend', '--no-pager', '-la']).splitlines() if x]
units = [x for x in units if x.endswith(b'.service') and b'@' not in x]
units = list(set(units))
import subprocess
from jadi import component
from aj.plugins.services.api import ServiceManager, Service
@component(ServiceManager)
class SystemdServiceManager(ServiceManager):
id = 'systemd'
name = 'systemd'
@classmethod
def __verify__(cls):
return subprocess.call(['which', 'systemctl']) == 0
def __init__(self, context):
pass
def list(self, units=None):
if not units:
units = [x.split()[0] for x in subprocess.check_output(['systemctl', 'list-unit-files', '--no-legend', '--no-pager', '-la']).splitlines() if x]
units = [x for x in units if x.endswith(b'.service') and b'@' not in x]
units = list(set(units))
from supervisor.options import ClientOptions
from jadi import component
from aj.plugins.services.api import ServiceManager, Service, ServiceOperationError
@component(ServiceManager)
class SupervisorServiceManager(ServiceManager):
id = 'supervisor'
name = 'Supervisor'
def __init__(self, context):
options = ClientOptions()
options.realize([])
self.supervisor = options.getServerProxy().supervisor
def list(self):
for info in self.supervisor.getAllProcessInfo():
yield self.__make_service(info)
def __make_service(self, info):
svc = Service(self)
svc.id = info['name']
svc.name = info['name']
def __init__(self, context):
self.context = context
self.managers = dict((x.id, x) for x in ServiceManager.all(self.context))
from supervisor.options import ClientOptions
from jadi import component
from aj.plugins.services.api import ServiceManager, Service, ServiceOperationError
@component(ServiceManager)
class SupervisorServiceManager(ServiceManager):
id = 'supervisor'
name = 'Supervisor'
def __init__(self, context):
options = ClientOptions()
options.realize([])
self.supervisor = options.getServerProxy().supervisor
def list(self):
for info in self.supervisor.getAllProcessInfo():
yield self.__make_service(info)
def __make_service(self, info):
svc = Service(self)
svc.id = info['name']
from dbus.exceptions import DBusException
from upstart.system import UpstartSystem, DirectUpstartBus
from upstart.job import UpstartJob
from jadi import component
from aj.plugins.services.api import ServiceManager, Service, ServiceOperationError
@component(ServiceManager)
class UpstartServiceManager(ServiceManager):
id = 'upstart'
name = 'Upstart'
@classmethod
def __verify__(cls):
try:
UpstartSystem()
return True
except:
try:
UpstartSystem(bus=DirectUpstartBus())
return True
except:
return False
import os
import subprocess
from jadi import component
from aj.plugins.services.api import ServiceManager, Service
INIT_D = '/etc/init.d'
UPSTART_PATTERN = '/etc/init/%s.conf'
@component(ServiceManager)
class SysVServiceManager(ServiceManager):
id = 'sysv'
name = 'System V'
@classmethod
def __verify__(cls):
return os.path.exists(INIT_D)
def __init__(self, context):
pass
def list(self):
for _id in os.listdir(INIT_D):
path = os.path.join(INIT_D, _id)
if _id.startswith('.'):
continue
if _id.startswith('rc'):
import os
import subprocess
from jadi import component
from aj.plugins.services.api import ServiceManager, Service
INIT_D = '/etc/init.d'
UPSTART_PATTERN = '/etc/init/%s.conf'
@component(ServiceManager)
class SysVServiceManager(ServiceManager):
id = 'sysv'
name = 'System V'
@classmethod
def __verify__(cls):
return os.path.exists(INIT_D)
def __init__(self, context):
pass
def list(self):
for _id in os.listdir(INIT_D):
path = os.path.join(INIT_D, _id)
if _id.startswith('.'):
continue
from dbus.exceptions import DBusException
from upstart.system import UpstartSystem, DirectUpstartBus
from upstart.job import UpstartJob
from jadi import component
from aj.plugins.services.api import ServiceManager, Service, ServiceOperationError
@component(ServiceManager)
class UpstartServiceManager(ServiceManager):
id = 'upstart'
name = 'Upstart'
@classmethod
def __verify__(cls):
try:
UpstartSystem()
return True
except:
try:
UpstartSystem(bus=DirectUpstartBus())
return True
except:
return False
def __init__(self, context):