Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
name (str): Name of the alias to resolve.
Notes:
An exact match against the aliases will always be returned first.
If no exact match is found and the name contains a '/' in it, the
value before the slash is looked up and the remainder of the path
is joined to any result. This is done to support dynamic Telepath
share names.
Returns:
str: The url string, if present in the alias. None will be returned
if there are no matches.
'''
path = s_common.getSynPath('aliases.yaml')
if not os.path.isfile(path):
return None
conf = s_common.yamlload(path)
# Is there an exact match - if so, return it.
url = conf.get(name)
if url:
return url
# Since telepath supports dynamic shared object access,
# slice a name at the first '/', look up using that value
# and then append the second value to it.
dynname = None
if '/' in name:
name, dynname = name.split('/', 1)
def _main(): # pragma: no cover
s_common.setlogging(logger, 'DEBUG')
return main(sys.argv[1:])
The ca has signed all three certs. The ``server.crt`` is for
a server running on localhost. The ``root.crt`` and ``user.crt``
certs are both are user certs which can connect. They have the
common names "root@localhost" and "user@localhost", respectively.
Returns:
None
'''
fns = ('ca.crt', 'ca.key', 'ca.pem',
'server.crt', 'server.key', 'server.pem',
'root.crt', 'root.key', 'user.crt', 'user.key')
for fn in fns:
byts = s_data.get(fn)
dst = os.path.join(dirn, fn)
if not os.path.exists(dst):
with s_common.genfile(dst) as fd:
fd.write(byts)
def getLinkRelay(link):
'''
Get a LinkRelay for the given link tufo.
Example:
relay = getLinkRelay(link)
'''
proto = link[0]
ctor = protos.get(proto)
if ctor is None:
raise s_common.NoSuchProto(proto)
return ctor(link)
def _initTufoSnap(self, idens):
snap = s_common.guid()
if not idens:
return {'snap': snap, 'tufos': (), 'count': 0}
count = len(idens)
x = collections.deque(s_common.chunks(idens, 1000))
rets = x.popleft()
tufos = self.getTufosByIdens(rets)
if x:
self.snaps.put(snap, x)
return {'snap': snap, 'tufos': tufos, 'count': count}
def getMeldBytes(self):
'''
Return a msgpack packed copy of the MindMeld dictionary.
'''
return s_common.msgenpack(self.info)
pars.add_argument('--debug', '-d', default=False, action='store_true',
help='Drop to interactive prompt to inspect cortex after loading data.')
pars.add_argument('--format', '-f', type=str, action='store', default='syn.ingest',
help='Feed format to use for the ingested data.')
pars.add_argument('--modules', '-m', type=str, action='append', default=[],
help='Additional modules to load locally with a test Cortex.')
pars.add_argument('--chunksize', type=int, action='store', default=1000,
help='Default chunksize for iterating over items.')
pars.add_argument('--offset', type=int, action='store', default=0,
help='Item offset to start consuming msgpack files from.')
pars.add_argument('files', nargs='*', help='json/yaml/msgpack feed files')
return pars
if __name__ == '__main__': # pragma: no cover
s_common.setlogging(logger, 'DEBUG')
asyncio.run(main(sys.argv[1:]))
def _save_update_stats(self, client):
took = s_common.now() - client.starttime
self.xact.guarantee()
self.blobstor._clone_seqn.save(self.xact.txn, client.newhashes)
self.blobstor._metrics.inc(self.xact.txn, 'bytes', client.totalsize)
self.blobstor._metrics.inc(self.xact.txn, 'blobs', len(client.newhashes))
self.blobstor._metrics.record(self.xact.txn, {'time': client.starttime, 'size': client.totalsize, 'took': took})
def _main(): # pragma: no cover
s_common.setlogging(logger, 'DEBUG')
return main(sys.argv[1:])
Examples:
Read the blobsize of the first blob stored at in the BlobFile:
head = blob.readoff(0,s_blobfile.headsize)
Returns:
bytes: The bytes from a given offset.
Raises:
s_common.BadBlobFile: If not enough bytes were read to fullfill the request.
'''
byts = self.atom.readoff(off, size)
if len(byts) != size:
raise s_common.BadBlobFile(mesg='readoff was short',
expected_size=size,
byts_len=len(byts))
return byts