Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
PROM_CALIBRE_FROMHTML_TIME = prometheus_client.Histogram(
'convert_calibre_fromhtml_seconds',
"Time to convert from HTML using Calibre (calibre_from_html())",
buckets=BUCKETS,
)
HTML_EXTENSIONS = ('.htm', '.html', '.xhtml')
class ConversionError(ValueError):
"""Error converting document.
"""
class UnsupportedFormat(ConversionError):
"""This format is not supported.
"""
PROC_TERM_GRACE = 5 # Wait 5s after SIGTERM before sending SIGKILL
async def check_call(cmd, timeout):
proc = await asyncio.create_subprocess_exec(cmd[0], *cmd[1:])
try:
retcode = await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
logger.error(
"Process didn't finish before %ds timeout: %r",
timeout, cmd,
)
async def post(self, project_id):
project = self.get_project(project_id)
name = self.get_body_argument('name')
description = self.get_body_argument('description')
file = self.request.files['file'][0]
content_type = file.content_type
filename = secure_filename(file.filename)
try:
body = await convert.to_html_chunks(file.body, content_type,
filename)
except convert.ConversionError as err:
self.set_status(400)
self.send_json({
'error': str(err),
})
else:
doc = database.Document(
name=name,
description=description,
filename=filename,
project=project,
contents=body,
)
self.db.add(doc)
self.db.flush() # Need to flush to get doc.id
cmd = database.Command.document_add(
self.current_user,
validate.document_name(name)
description = self.get_body_argument('description')
validate.document_description(description)
try:
file = self.request.files['file'][0]
except (KeyError, IndexError):
raise MissingArgumentError('file')
content_type = file.content_type
filename = validate.filename(file.filename)
try:
body = await convert.to_html_chunks(
file.body, content_type, filename,
self.application.config,
)
except convert.ConversionError as err:
self.set_status(400)
return self.send_json({
'error': str(err),
})
else:
doc = database.Document(
name=name,
description=description,
filename=filename,
project=project,
contents=body,
)
self.db.add(doc)
self.db.flush() # Need to flush to get doc.id
cmd = database.Command.document_add(
self.current_user,
convert = os.path.join(os.environ['CALIBRE'], convert)
cmd = [convert, input_filename, output_dir, '--enable-heuristics']
cmd_again = [convert, input_filename, output_dir]
if os.path.splitext(input_filename)[1].lower() == '.pdf':
cmd.append('--no-images')
logger.info("Running: %s", ' '.join(cmd))
try:
try:
await check_call(cmd, config['CONVERT_TO_HTML_TIMEOUT'])
except asyncio.TimeoutError:
logger.error("Calibre timed out, trying again without "
"heuristics...")
try:
await check_call(cmd_again, config['CONVERT_TO_HTML_TIMEOUT'])
except asyncio.TimeoutError:
raise ConversionError("Calibre timed out")
except CalledProcessError as e:
raise ConversionError("Calibre returned %d" % e.returncode)
logger.info("ebook-convert successful")
# Locate OEB manifest
manifests = [e.lower() for e in os.listdir(output_dir)]
manifests = [e for e in manifests if e.endswith('.opf')]
if not manifests:
logger.error("No OPF manifest in Calibre's output")
raise ConversionError("Invalid output from Calibre")
elif manifests == ['content.opf']:
manifest = 'content.opf' # All good
elif len(manifests) > 1 and 'content.opf' in manifests:
logger.warning("Calibre's output contains multiple OPF "
"manifests! Using content.opf")
manifest = 'content.opf'
"%r" % manifests)
raise ConversionError("Invalid output from Calibre")
size = os.stat(os.path.join(output_dir, manifest)).st_size
if size > config['OPF_OUT_SIZE_LIMIT']:
logger.warning("OPF manifest is %d bytes; aborting", size)
raise ConversionError("File is too long")
# Open OEB manifest
logger.info("Parsing OPF manifest %s", manifest)
tree = ElementTree.parse(os.path.join(output_dir, manifest))
root = tree.getroot()
ns = '{http://www.idpf.org/2007/opf}'
if root.tag not in ('package', ns + 'package'):
logger.error("Invalid root tag in OPF manifest: %r", root.tag)
raise ConversionError("Invalid output from Calibre")
manifests = [tag for tag in root
if tag.tag in ('manifest', ns + 'manifest')]
if len(manifests) != 1:
logger.error("OPF has %d nodes", len(manifests))
raise ConversionError("Invalid output from Calibre")
manifest, = manifests
spines = [tag for tag in root
if tag.tag in ('spine', ns + 'spine')]
if len(spines) != 1:
logger.error("OPF has %d nodes", len(spines))
raise ConversionError("Invalid output from Calibre")
spine, = spines
# Read
items = {}
for item in manifest:
output_filename = os.path.join(tmp, 'output.html')
# Run WV
convert = 'wvHtml'
if os.environ.get('WVHTML'):
convert = os.environ['WVHTML']
cmd = [convert, input_filename, output_filename]
logger.info("Running: %s", ' '.join(cmd))
try:
await check_call(cmd, config['CONVERT_TO_HTML_TIMEOUT'])
except OSError:
raise ConversionError("Can't call wvHtml")
except CalledProcessError as e:
raise ConversionError("wvHtml returned %d" % e.returncode)
except asyncio.TimeoutError:
raise ConversionError("wvHtml timed out")
logger.info("wvHtml successful")
# Read output
with open(output_filename, 'rb') as fp:
return get_html_body(fp.read())
async def wvware_to_html(input_filename, tmp, config):
PROM_WVWARE_TOHTML.inc()
output_filename = os.path.join(tmp, 'output.html')
# Run WV
convert = 'wvHtml'
if os.environ.get('WVHTML'):
convert = os.environ['WVHTML']
cmd = [convert, input_filename, output_filename]
logger.info("Running: %s", ' '.join(cmd))
try:
await check_call(cmd, config['CONVERT_TO_HTML_TIMEOUT'])
except OSError:
raise ConversionError("Can't call wvHtml")
except CalledProcessError as e:
raise ConversionError("wvHtml returned %d" % e.returncode)
except asyncio.TimeoutError:
raise ConversionError("wvHtml timed out")
logger.info("wvHtml successful")
# Read output
with open(output_filename, 'rb') as fp:
return get_html_body(fp.read())
logger.warning("Ignoring item %r, mimetype=%r",
idref, output_mimetype)
continue
output_filename = os.path.join(output_dir, output_name)
if not os.path.isfile(output_filename):
logger.error("Missing file from output dir: %r",
output_name)
raise ConversionError("Invalid output from Calibre")
# Read output
logger.info("Reading in %r", output_name)
size += os.stat(output_filename).st_size
if size > config['HTML_OUT_SIZE_LIMIT']:
logger.error("File is %d bytes for a total of %d bytes; aborting",
os.stat(output_filename).st_size, size)
raise ConversionError("File is too long")
with open(output_filename, 'rb') as fp:
output.append(get_html_body(fp.read()))
# TODO: Store media files
# Assemble output
return '\n'.join(output)