Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
operations[:] = []
if len(operations):
logger.debug("Pushing final chunk.")
_export_pymongo(chunk, operations, index, mirrors, num_tries, timeout)
if update:
if ids:
stale = set()
for root in ids:
docs_ = index.find({'root': root})
all_ = {doc['_id'] for doc in docs_}
stale.update(all_.difference(ids[root]))
logger.info("Removing {} stale documents.".format(len(stale)))
for _id in set(stale):
index.delete_one(dict(_id=_id))
else:
raise errors.ExportError(
"The exported docs sequence is empty! Unable to update!")
"""Export docs via operations to index and files to mirrors."""
import pymongo
if mirrors is not None:
for mirror in mirrors:
for doc in docs:
if 'file_id' in doc:
export_to_mirror(doc, mirror, num_tries, timeout)
for i in range(num_tries):
try:
index.bulk_write(operations)
break
except pymongo.errors.AutoReconnect as error:
logger.warning(error)
sleep(timeout)
else:
raise errors.ExportError()
def export_to_mirror(doc, mirror, num_tries=3, timeout=60):
"""Export a file associated with doc to mirror.
:param doc: A document with a file_id entry.
:param mirror: A file-system object to export the file to.
:param num_tries: The number of automatic retry attempts in case of
mirror connection errors.
:type num_tries: int
:param timeout: The time in seconds to wait before an
automatic retry attempt.
:type timeout: int
:returns: The file id after successful export.
"""
if 'file_id' not in doc:
raise errors.ExportError("Doc '{}' does not have a file_id entry.".format(doc))
for i in range(num_tries):
try:
with fetch(doc, mode='rb') as file:
_export_to_mirror(file, doc['file_id'], mirror)
except mirror.FileExistsError:
logger.debug(
"File with id '{}' already exported, skipping.".format(doc['file_id']))
break
except mirror.AutoRetry as error:
logger.warning("Error during export: '{}', retrying...".format(error))
sleep(timeout)
else:
logger.debug(
"Stored file with id '{}' in mirror '{}'.".format(doc['file_id'], mirror))
return doc['file_id']
else:
try:
with fetch(doc, mode='rb') as file:
_export_to_mirror(file, doc['file_id'], mirror)
except mirror.FileExistsError:
logger.debug(
"File with id '{}' already exported, skipping.".format(doc['file_id']))
break
except mirror.AutoRetry as error:
logger.warning("Error during export: '{}', retrying...".format(error))
sleep(timeout)
else:
logger.debug(
"Stored file with id '{}' in mirror '{}'.".format(doc['file_id'], mirror))
return doc['file_id']
else:
raise errors.ExportError(doc)