Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def extract_file(self, zipf, name, temp_dir):
base_name = safe_filename(os.path.basename(name))
out_file = os.path.join(temp_dir, base_name)
with open(out_file, 'w+b') as outfh:
try:
with zipf.open(name) as infh:
shutil.copyfileobj(infh, outfh)
except KeyError:
log.warning("Cannot load zip member: %s", name)
return out_file
def make_filename(self, entity):
"""Some of the file importers actually care about the file
extension, so this is trying to make sure we use a temporary
file name that has an appropriate extension."""
for file_name in entity.get('fileName', quiet=True):
_, extension = os.path.splitext(file_name)
if len(extension):
return safe_filename(file_name)
extension = first(entity.get('extension', quiet=True))
if extension is None:
mime_type = first(entity.get('mimeType', quiet=True))
if mime_type is not None:
extension = guess_extension(mime_type)
extension = extension or 'bin'
return safe_filename('data', extension=extension)
def make_filename(self, entity):
"""Some of the file importers actually care about the file
extension, so this is trying to make sure we use a temporary
file name that has an appropriate extension."""
for file_name in entity.get('fileName', quiet=True):
_, extension = os.path.splitext(file_name)
if len(extension):
return safe_filename(file_name)
extension = first(entity.get('extension', quiet=True))
if extension is None:
mime_type = first(entity.get('mimeType', quiet=True))
if mime_type is not None:
extension = guess_extension(mime_type)
extension = extension or 'bin'
return safe_filename('data', extension=extension)
def extract_file(self, zipf, name):
"""Extract a message file from the OLM zip archive"""
path = pathlib.Path(name)
base_name = safe_filename(path.name)
out_file = self.make_work_file(base_name)
with open(out_file, 'w+b') as outfh:
try:
with zipf.open(name) as infh:
shutil.copyfileobj(infh, outfh)
except KeyError:
log.warning("Cannot load zip member: %s", name)
return out_file
content_hash = first(properties.get('contentHash'))
if content_hash:
mime_type = first(properties.get('mimeType'))
name = safe_filename(file_name, default=pk)
links['file'] = archive_url(request.authz.id, content_hash,
file_name=name,
mime_type=mime_type)
pdf_hash = first(properties.get('pdfHash'))
if pdf_hash:
name = safe_filename(file_name, default=pk, extension='.pdf')
links['pdf'] = archive_url(request.authz.id, pdf_hash,
file_name=name, mime_type=PDF)
csv_hash = first(properties.get('csvHash'))
if csv_hash:
name = safe_filename(file_name, default=pk, extension='.csv')
links['csv'] = archive_url(request.authz.id, csv_hash,
file_name=name, mime_type=CSV)
obj['links'] = links
obj['writeable'] = authz.can(collection_id, authz.WRITE)
obj.pop('_index', None)
return self._clean_response(obj)
def ingest(self, file_path):
self.result.flag(self.result.FLAG_WORKBOOK)
for table_name in self.get_tables(file_path):
csv_name = safe_filename(table_name, extension='csv')
csv_path = join_path(self.work_path, csv_name)
self.dump_table(file_path, table_name, csv_path)
child_id = join_path(self.result.id, table_name)
self.manager.handle_child(self.result, csv_path,
id=child_id,
title=table_name,
file_name=csv_name,
mime_type='text/csv')