Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _onSetCellPoolConf(self, conf):
auth = conf.get('auth')
if not auth:
raise s_common.BadConfValu(mesg='auth must be set', key='cellpool:conf')
auth = s_msgpack.un(s_common.debase64(auth))
neuraddr = (conf.get('host'), conf.get('port'))
if not all(part is not None for part in neuraddr):
logger.info('Popping "auth" with private data from mesg.')
conf.pop('auth', None)
raise s_common.BadConfValu(mesg='host and port must be set', key='cellpool:conf')
if self.cellpool:
self.cellpool.fini()
cellpool = s_cell.CellPool(auth, neuraddr)
if not cellpool.neurwait(timeout=self.cell_timeout):
cellpool.fini()
logger.info('Popping "auth" with private data from mesg.')
conf.pop('auth', None)
raise s_common.BadConfValu(mesg='unable to set up cell pool', key='cellpool:conf')
def _write_props(self, txn, rows):
'''
Emit propbags to the migration DB
'''
MAX_VAL_LEN = 511
idens = []
bigvals = []
for i, p, v, _ in rows:
enci = _enc_iden(i)
val = s_msgpack.en((p, v))
if len(val) > MAX_VAL_LEN:
next_val_enc = self.next_val.to_bytes(8, 'big')
bigvals.append((next_val_enc, val))
val = next_val_enc
self.next_val += 1
idens.append((enci, val))
with txn.cursor(self.iden_tbl) as icurs, txn.cursor(self.valu_tbl) as vcurs:
consumed, added = icurs.putmulti(idens)
if consumed != added or added != len(rows):
raise ConsistencyError('Failure writing to Db. consumed %d != added %d != len(rows) %d',
consumed, added, len(rows))
consumed, added = vcurs.putmulti(bigvals)
if consumed != added:
raise ConsistencyError('Failure writing to Db. consumed %d != added %d',
consumed, added)
def __init__(self, plex, sock):
Link.__init__(self, None)
self.plex = plex
self.sock = sock
self.txbuf = b''
self.txque = collections.deque() # (byts, info)
self.txlock = threading.Lock()
self.unpk = s_msgpack.Unpk()
self.flags = selectors.EVENT_READ
def fini():
self.plex._finiPlexSock(self.sock)
self.onfini(fini)
def convert_filebytes_secondary(self, formname, propval):
'''
Convert secondary prop that is a filebytes type
'''
with self.dbenv.begin(db=self.comp_tbl) as txn:
comp_enc = txn.get(propval.encode('utf8'), db=self.comp_tbl)
if comp_enc is None:
raise ConsistencyError('ndef accessed before determined')
return s_msgpack.un(comp_enc)[1]
Args:
xact (lmdb.Transaction): An LMDB transaction.
offs (int): The offset to begin iterating from.
Yields:
(indx, valu): The index and valu of the item.
'''
lkey = struct.pack('>Q', offs)
with xact.cursor(db=self.db) as curs:
if not curs.set_key(lkey):
return
for lkey, lval in curs.iternext():
indx = struct.unpack('>Q', lkey)[0]
valu = s_msgpack.un(lval)
yield indx, valu
Args:
valu (object): Optional, if provided, the hash of the msgpack
encoded form of the object is returned. This can be used to
create stable buids.
Notes:
By default, this returns a random 32 byte value.
Returns:
bytes: A 32 byte value.
'''
if valu is None:
return os.urandom(32)
byts = s_msgpack.en(valu)
return hashlib.sha256(byts).digest()
if not genrows_kwargs:
genrows_kwargs = {}
i = 0
j = 0
cur_bytes = 0
bufs = []
kwargs = preset_args.get(store.getStoreType(), {})
kwargs.update(genrows_kwargs)
tick = time.time()
for rows in store.genStoreRows(**kwargs):
j += len(rows)
i += len(rows)
tufo = s_tufo.tufo('core:save:add:rows', rows=rows)
if compress:
tufo[1]['rows'] = gzip.compress(s_msgpack.en(rows), 9)
byts = s_msgpack.en(tufo)
bufs.append(byts)
cur_bytes += len(byts)
if cur_bytes > s_const.mebibyte * DUMP_MEGS:
fd.write(b''.join([byts for byts in bufs]))
outp.printf('Stored {} rows, total {} rows'.format(j, i))
bufs = []
cur_bytes = 0
j = 0
# There still may be rows we need too write out.
if bufs:
fd.write(b''.join([byts for byts in bufs]))
outp.printf('Stored {} rows, total {} rows'.format(j, i))
bufs = []
tock = time.time()
outp.printf('Done dumping rows - took {} seconds.'.format(tock - tick))
def norm(self, valu):
byts = s_msgpack.en(valu)
return s_msgpack.un(byts), {}
This function reads the an "old" splice log file, writes to a temporary
file, and then overwrites the old file with the new data. This function
only converts old splices to new splices. If any messages are invalid,
an exception will be raised and the conversion will exit early and not
overwrite any data.
Returns:
None
'''
with tempfile.SpooledTemporaryFile() as tmp:
with open(fpath, 'r+b') as fd:
for chnk in s_common.chunks(s_msgpack.iterfd(fd), 1000):
for mesg in chnk:
mesg = convertOldSplice(mesg)
tmp.write(s_msgpack.en(mesg))
tmp.seek(0)
fd.seek(0)
data = tmp.read(_readsz)
while data:
fd.write(data)
data = tmp.read(_readsz)
fd.truncate()