Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.cmds = {}
self.insecure = self.boot.get('insecure', False)
self.sessions = {}
self.httpsonly = self.conf.get('https:only', False)
self.boss = await s_boss.Boss.anit()
self.onfini(self.boss)
await self._initCellSlab(readonly=readonly)
self.hive = await self._initCellHive()
self.auth = await self._initCellAuth()
# check and migrate old cell auth
oldauth = s_common.genpath(self.dirn, 'auth')
if os.path.isdir(oldauth):
await s_compat.cellAuthToHive(oldauth, self.auth)
os.rename(oldauth, oldauth + '.old')
admin = self.boot.get('auth:admin')
if admin is not None:
name, passwd = admin.split(':', 1)
user = self.auth.getUserByName(name)
if user is None:
user = await self.auth.addUser(name)
await user.setAdmin(True)
await user.setPasswd(passwd)
self.insecure = False
Args:
name (str): Name of the CryoTank.
Returns:
CryoTank: A CryoTank instance.
'''
tank = self.tanks.get(name)
if tank is not None:
return tank
iden = s_common.guid()
logger.info('Creating new tank: [%s][%s]', name, iden)
path = s_common.genpath(self.dirn, 'tanks', iden)
tank = await CryoTank.anit(path, conf)
node = await self.names.open((name,))
await node.set((iden, conf))
self.tanks.put(name, tank)
return tank
def getSvcPath(self, *path):
'''
Return a path relative to the service directory.
Args:
(path): List of path elements to join.
Returns:
(str): The joined path relative to the service directory.
'''
return s_common.genpath(self.dirn, *path)
conf (dict): Configuration dictionary for the cell. This may contain
the ctor in the ``ctor`` key.
Returns:
((str, function)): The python path to the ctor function and the resolved function.
Raises:
ReqConfOpt: If the ctor cannot be resolved from the cell path or conf
NoSuchCtor: If the ctor function cannot be resolved.
'''
ctor = None
if conf is not None:
ctor = conf.get('ctor')
path = s_common.genpath(dirn, 'config.json')
if ctor is None and os.path.isfile(path):
subconf = s_common.jsload(path)
ctor = subconf.get('ctor')
if ctor is None:
raise s_common.ReqConfOpt(mesg='Missing ctor, cannot divide',
name='ctor')
func = s_dyndeps.getDynLocal(ctor)
if func is None:
raise s_common.NoSuchCtor(mesg='Cannot resolve ctor',
name=ctor)
return ctor, func
def getCorePath(self, *paths):
'''
Construct a path relative to the cortex metadata dir (or None).
Args:
*paths ([str,]): A set of path elements
Returns:
(str): The full path ( or None ).
'''
dirn = self.getConfOpt('dir')
if dirn is None:
return None
return s_common.genpath(dirn, *paths)
def _getTankIden(self):
path = s_common.genpath(self.dirn, 'guid')
if os.path.isfile(path):
with open(path, 'r') as fd:
return fd.read().strip()
# legacy cell code...
cellpath = s_common.genpath(self.dirn, 'cell.guid')
if os.path.isfile(cellpath):
with open(cellpath, 'r') as fd:
iden = fd.read().strip()
with open(path, 'w') as fd:
fd.write(iden)
os.unlink(cellpath)
return iden
def _initDbInfo(self):
name = self._link[1].get('path')[1:]
if not name:
raise Exception('No Path Specified!')
if name.find(':') == -1:
name = s_common.genpath(name)
return {'name': name}
outp = s_output.OutPut()
pars = makeargpaser()
opts = pars.parse_args(argv)
if opts.module:
mod = s_dyndeps.tryDynMod(opts.module)
outp.printf(f'Loaded {opts.module}@{mod}')
if opts.cells:
outp.printf('Registered cells:')
for cname, cpath in s_cells.getCells():
outp.printf(f'{cname:<10} {cpath:>10}')
return 0
dirn = s_common.genpath(opts.dmonpath, 'cells', opts.cellname)
if os.path.isdir(dirn):
outp.printf(f'cell directory already exists: {dirn}')
return 1
dmon = {}
if opts.listen:
dmon['listen'] = opts.listen
if opts.module:
dmon['modules'] = [opts.module]
if dmon:
dmon.setdefault('modules', [])
dmon_fp = os.path.join(opts.dmonpath, 'dmon.yaml')
if os.path.exists(dmon_fp):
outp.printf(f'Cannot overwrite existing dmon.yaml file. [{dmon_fp}]')
def getCaCerts(self):
'''
Return a list of CA certs from the CertDir.
Returns:
[OpenSSL.crypto.X509]: List of CA certificates.
'''
retn = []
path = s_common.genpath(self.certdir, 'cas')
for name in os.listdir(path):
if not name.endswith('.crt'):
continue
full = s_common.genpath(self.certdir, 'cas', name)
retn.append(self._loadCertPath(full))
return retn
def genauth(opts, outp=s_output.stdout):
authpath = s_common.genpath(opts.authfile)
savepath = s_common.genpath(opts.savepath)
if not os.path.isfile(authpath):
outp.printf('auth file not found: %s' % (authpath,))
return
auth = s_msgpack.loadfile(authpath)
addr = auth[1].get('neuron')
if addr is None:
outp.printf('auth file has no neuron info: %s' % (authpath, ))
return
celluser = s_cell.CellUser(auth)
with celluser.open(addr, timeout=20) as sess: