Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _save(self, data=None):
assert self._filename is not None
if data is None:
data = self._as_dict()
# Serialize data:
blob = json.dumps(data).encode()
if _BUFFERED_MODE > 0:
_store_in_buffer(self._filename, blob)
else: # Saving to disk:
if self._write_concern:
dirname, filename = os.path.split(self._filename)
fn_tmp = os.path.join(dirname, '._{uid}_{fn}'.format(
uid=uuid.uuid4(), fn=filename))
with open(fn_tmp, 'wb') as tmpfile:
tmpfile.write(blob)
os.replace(fn_tmp, self._filename)
else:
with open(self._filename, 'wb') as file:
file.write(blob)
1. If the filter is None, the result is directly returned, since
all documents will match an empty filter.
2. If the filter argument contains a primary key, the result
is directly returned since no search operation is necessary.
3. The filter is processed key by key, once the result vector is
empty it is immediately returned.
:param filter: The filter argument that all documents must match.
:param limit: Limit the size of the result vector.
:raises ValueError: In case that the filter argument is invalid.
:returns: A set of ids of documents that match the given filter.
"""
self._assert_open()
if filter:
filter = json.loads(json.dumps(filter)) # Normalize
if not _valid_filter(filter):
raise ValueError(filter)
result = self._find_result(filter)
return set(islice(result, limit if limit else None))
else:
return set(islice(self._docs.keys(), limit if limit else None))
def _host_id(hostcfg):
return json.dumps(hostcfg, sort_keys=True)
try:
tmp = self.read_statepoints(fn=fn)
except IOError as error:
if not error.errno == errno.ENOENT:
raise
tmp = dict()
if statepoints is None:
job_ids = self._job_dirs()
_cache = {_id: self._get_statepoint(_id) for _id in job_ids}
else:
_cache = {calc_id(sp): sp for sp in statepoints}
tmp.update(_cache)
logger.debug("Writing state points file with {} entries.".format(len(tmp)))
with open(fn, 'w') as file:
file.write(json.dumps(tmp, indent=indent))
def _get_cached_credentials(hostcfg, default):
hostcfg_id = json.dumps(hostcfg, sort_keys=True)
if hostcfg_id in SESSION_PASSWORD_HASH_CACHE:
logger.debug("Loading credentials from cache.")
return SESSION_PASSWORD_HASH_CACHE[hostcfg_id]
else:
return SESSION_PASSWORD_HASH_CACHE.setdefault(hostcfg_id, default())