Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete_existing(file):
'''Delete the given file.'''
# Check if it's actually a directory.
if path.isdir(file):
if __debug__: log('doing rmtree on directory {}', file)
try:
shutil.rmtree(file)
except:
if __debug__: log('unable to rmtree {}; will try renaming', file)
try:
rename_existing(file)
except:
if __debug__: log('unable to rmtree or rename {}', file)
else:
if __debug__: log('doing os.remove on file {}', file)
os.remove(file)
def rename(f):
backup = f + '.bak'
# If we fail, we just give up instead of throwing an exception.
try:
os.rename(f, backup)
if __debug__: log('renamed {} to {}', file, backup)
except:
try:
delete_existing(backup)
os.rename(f, backup)
except:
if __debug__: log('failed to delete {}', backup)
if __debug__: log('failed to rename {} to {}', file, backup)
def targets_from_arguments(self, files, from_file):
targets = []
if from_file:
if __debug__: log('reading {}', from_file)
targets = filter(None, open(from_file).read().splitlines())
else:
for item in files:
if is_url(item):
targets.append(item)
elif path.isfile(item) and filename_extension(item) in ACCEPTED_FORMATS:
targets.append(item)
elif path.isdir(item):
# It's a directory, so look for files within.
targets += files_in_directory(item, extensions = ACCEPTED_FORMATS)
else:
warn('"{}" not a file or directory', item)
# Filter files created in past runs.
targets = filter(lambda name: '.handprint' not in name, targets)
def delete_existing(file):
'''Delete the given file.'''
# Check if it's actually a directory.
if path.isdir(file):
if __debug__: log('doing rmtree on directory {}', file)
try:
shutil.rmtree(file)
except:
if __debug__: log('unable to rmtree {}; will try renaming', file)
try:
rename_existing(file)
except:
if __debug__: log('unable to rmtree or rename {}', file)
else:
if __debug__: log('doing os.remove on file {}', file)
os.remove(file)
def init_credentials(self):
'''Initializes the credentials to use for accessing this service.'''
try:
if __debug__: log('initializing credentials')
self._credentials = MicrosoftCredentials().creds()
except Exception as ex:
raise AuthFailure(str(ex))
if __debug__: log('received {} bytes', len(response.content))
return response
except Exception as ex:
# Problem might be transient. Don't quit right away.
if __debug__: log('exception: {}', str(ex))
failures += 1
# Record the first error we get, not the subsequent ones, because
# in the case of network outages, the subsequent ones will be
# about being unable to reconnect and not the original problem.
if not error:
error = ex
if failures >= _MAX_FAILURES:
# Try pause & continue, in case of transient network issues.
if retries < _MAX_RETRIES:
retries += 1
if __debug__: log('pausing because of consecutive failures')
sleep(60 * retries)
failures = 0
else:
# We've already paused & restarted once.
raise error
if network_available():
raise ServiceFailure(addurl('Timed out reading data from server'))
else:
raise NetworkFailure(addurl('Timed out reading data over network'))
except requests.exceptions.InvalidSchema as ex:
raise NetworkFailure(addurl('Unsupported network protocol'))
except Exception as ex:
raise
# Interpret the response.
code = req.status_code
if code == 202:
# Code 202 = Accepted, "received but not yet acted upon."
sleep(1) # Sleep a short time and try again.
recursing += 1
if __debug__: log('Calling download() recursively for http code 202')
download(url, user, password, local_destination, recursing)
elif 200 <= code < 400:
# This started as code in https://stackoverflow.com/a/13137873/743730
# Note: I couldn't get the shutil.copyfileobj approach to work; the
# file always ended up zero-length. I couldn't figure out why.
with open(local_destination, 'wb') as f:
for chunk in req.iter_content(chunk_size = 1024):
if chunk:
f.write(chunk)
req.close()
elif code in [401, 402, 403, 407, 451, 511]:
raise AuthFailure(addurl('Access is forbidden'))
elif code in [404, 410]:
raise NoContent(addurl('No content found'))
elif code in [405, 406, 409, 411, 412, 414, 417, 428, 431, 505, 510]:
raise InternalError(addurl('Server returned code {}'.format(code)))
if __debug__: log('calling Amazon API function')
response = getattr(client, api_method)( **{ image_keyword : {'Bytes': image} })
if __debug__: log('received {} blocks', len(response[response_key]))
full_text = ''
boxes = []
width, height = imagesize.get(file_path)
for block in response[response_key]:
if value_key in block and block[value_key] == "WORD":
text = block[block_key]
full_text += (text + ' ')
corners = corner_list(block['Geometry']['Polygon'], width, height)
if corners:
boxes.append(TextBox(boundingBox = corners, text = text))
else:
# Something's wrong with the vertex list. Skip & continue.
if __debug__: log('bad bb for {}: {}', text, bb)
return TRResult(path = file_path, data = response, boxes = boxes,
text = full_text, error = None)
except KeyboardInterrupt as ex:
raise
except Exception as ex:
text = 'Error: {} -- {}'.format(str(ex), file_path)
return TRResult(path = file_path, data = {}, boxes = [],
text = '', error = text)
if dest_file is None:
dest_file = filename_basename(file) + '.' + dest_format
# PIL is unable to read PDF files, so in that particular case, we have to
# convert it using another tool.
if filename_extension(orig_file) == '.pdf':
import fitz
doc = fitz.open(orig_file)
if len(doc) >= 1:
if len(doc) >= 2:
if __debug__: log('{} has > 1 images; using only 1st', orig_file)
# FIXME: if there's more than 1 image, we could extra the rest.
# Doing so will require some architectural changes first.
if __debug__: log('extracting 1st image from {}', dest_file)
page = doc[0]
pix = page.getPixmap(alpha = False)
if __debug__: log('writing {}', dest_file)
pix.writeImage(dest_file, dest_format)
return (dest_file, None)
else:
if __debug__: log('fitz says there is no image image in {}', orig_file)
return (None, 'Cannot find an image inside {}'.format(orig_file))
else:
# When converting images, PIL may issue a DecompressionBombWarning but
# it's not a concern in our application. Ignore it.
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
im = Image.open(orig_file)
if __debug__: log('converting {} to RGB', orig_file)
im.convert('RGB')
if __debug__: log('saving converted image to {}', dest_file)
if orig_file == dest_file:
def delete_existing(file):
'''Delete the given file.'''
# Check if it's actually a directory.
if path.isdir(file):
if __debug__: log('doing rmtree on directory {}', file)
try:
shutil.rmtree(file)
except:
if __debug__: log('unable to rmtree {}; will try renaming', file)
try:
rename_existing(file)
except:
if __debug__: log('unable to rmtree or rename {}', file)
else:
if __debug__: log('doing os.remove on file {}', file)
os.remove(file)