Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.headers['Pragma'] = 'cache'
self.headers['Cache-Control'] = 'private'
self.headers['Content-Length'] = str(self.size)
if self.etag:
self.headers.add_header('Etag', self.etag)
if self.expires:
self.headers.add_header('Expires', self.expires)
try:
# Implement 206 partial file support.
start, end = headers['range'].split('-')
start = 0 if not start.isdigit() else int(start)
end = self.size if not end.isdigit() else int(end)
if self.size < end or start < 0:
self.status = "214 Unsatisfiable Range Requested"
self.data = FileWrapper(f, CHUNK_SIZE)
else:
f.seek(start)
self.data = LimitingFileWrapper(f, blksize=CHUNK_SIZE, limit=end)
self.status = "206 Partial Content"
except:
self.data = FileWrapper(f, CHUNK_SIZE)
except IOError:
self.status = "403 Forbidden"
def serve_static(path: Path, statics: Statics, environ: wsgi.WSGIEnviron) -> wsgi.WSGIResponse:
if not path.startswith('/'):
path = Path('/' + path)
static_file = statics.whitenoise.files.get(path)
if static_file is None:
raise exceptions.NotFound()
response = static_file.get_response(environ['REQUEST_METHOD'], environ)
status_line = '{} {}'.format(response.status, response.status.phrase)
headers = list(response.headers)
if response.file is not None:
file_wrapper = environ.get('wsgi.file_wrapper', FileWrapper)
content = file_wrapper(response.file)
else:
# We hit this branch for HEAD requests
content = []
return wsgi.WSGIResponse(status_line, headers, content)
def download_logs(request, api, account_info, config, username):
date = datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S')
filename = 'openmanage-logs-%s.tar.bz2' % date
path = '/opt/openmanage/tmp_logs/%s' % filename
subprocess.call(['/opt/openmanage/bin/gather_logs.sh', date])
response = HttpResponse(FileWrapper(open(path)),
content_type='application/bzip2')
response['Content-Disposition'] = 'attachment; filename=%s' % filename
return response
def get_file_wrapper(environ: WSGIEnviron):
return environ.get('wsgi.file_wrapper', FileWrapper)
self.start_response('404 %s' % (self.STATUS_MESSAGE[404], ), [('Content-type', 'text/html; charset=utf-8'), ('Cache-Control', 'public, max-age=3600')])
return self.error_page(404)
else:
path = os.path.join(self.directory, self.asset)
if not os.path.isfile(path):
self.start_response('404 %s' % (self.STATUS_MESSAGE[404], ), [('Content-type', 'text/html; charset=utf-8'), ('Cache-Control', 'public, max-age=3600')])
return self.error_page(404)
elif self.environ['REQUEST_METHOD'] != 'GET':
self.start_response('405 %s' % (self.STATUS_MESSAGE[405], ), [('Content-type', 'text/html; charset=utf-8'), ('Accept', 'GET, HEAD, OPTIONS'), ('Cache-Control', 'public, max-age=3600')])
return self.error_page(405)
else:
try:
f = open(path, 'rb')
type, encoding = mimetypes.guess_type(path)
self.start_response('200 OK', [('Content-type', type or 'application/octet-stream'), ('Cache-Control', 'public, max-age=3600')])
return wsgiref.util.FileWrapper(f)
except:
self.start_response('500 %s' % (self.STATUS_MESSAGE[500], ), [('Content-type', 'text/html; charset=utf-8'), ('Cache-Control', 'public, max-age=3600')])
return self.error_page(500)
file_content = storage.open(file_data.file_name)
except Exception as ex:
print({}.format(ex)) # DEBUG
messages.error(self.request, 'Error opening file!')
return redirect(
reverse(
'filesfolders:list', kwargs={'project': kwargs['project']}
)
)
# Return file as attachment
response = HttpResponse(
FileWrapper(file_content), content_type=file_data.content_type
)
if SERVE_AS_ATTACHMENT:
response['Content-Disposition'] = 'attachment; filename={}'.format(
file.name
)
if not self.request.user.is_anonymous:
# Add event in Timeline
if timeline:
tl_event = timeline.add_event(
project=file.project,
app_name=APP_NAME,
user=self.request.user,
event_name='file_serve',
description='serve file {file}',
def download_file(request, file_id):
if request.method == 'POST':
return BridgeErrorResponse(301)
try:
source = jobs.utils.FileSystem.objects.get(pk=file_id)
except ObjectDoesNotExist:
return BridgeErrorResponse(_('The file was not found'))
if source.file is None:
logger.error('Trying to download directory')
return BridgeErrorResponse(500)
mimetype = mimetypes.guess_type(os.path.basename(source.name))[0]
response = StreamingHttpResponse(FileWrapper(source.file.file, 8192), content_type=mimetype)
response['Content-Length'] = len(source.file.file)
response['Content-Disposition'] = "attachment; filename=%s" % quote(source.name)
return response
if PY3K:
from email.utils import formatdate
else:
# Caps Utils for Py2.4 compatibility
from email.Utils import formatdate
# Define Constants
NEWLINE = b('\r\n')
HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s'''
BASE_ENV = {'SERVER_NAME': SERVER_NAME,
'SCRIPT_NAME': '', # Direct call WSGI does not need a name
'wsgi.errors': sys.stderr,
'wsgi.version': (1, 0),
'wsgi.multiprocess': False,
'wsgi.run_once': False,
'wsgi.file_wrapper': FileWrapper
}
class WSGIWorker(Worker):
def __init__(self, *args, **kwargs):
"""Builds some instance variables that will last the life of the
thread."""
Worker.__init__(self, *args, **kwargs)
if isinstance(self.app_info, dict):
multithreaded = self.app_info.get('max_threads') != 1
else:
multithreaded = False
self.base_environ = dict(
{'SERVER_SOFTWARE': self.app_info['server_software'],
'wsgi.multithread': multithreaded,
def download_file_response(request, filePath, content_disposition):
filename = os.path.basename(filePath)
filesize = os.stat(filePath).st_size
file = open(filePath, "rb")
# More than 100mb, normal http response, otherwise stream
# Django docs say to avoid streaming when possible
stream = filesize > 1e8 or request.GET.get('_force_stream', False)
if stream:
response = FileResponse(file)
else:
response = HttpResponse(FileWrapper(file),
content_type=(mimetypes.guess_type(filename)[0] or "application/zip"))
response['Content-Type'] = mimetypes.guess_type(filename)[0] or "application/zip"
response['Content-Disposition'] = "{}; filename={}".format(content_disposition, filename)
response['Content-Length'] = filesize
# For testing
if stream:
response['_stream'] = 'yes'
return response
def __get_bulk_size(bulk_size=None):
"""
Get the size of chunks data should be loaded in.
Args:
bulk_size: Custom bulksize
Returns:
int: ``bulk_size`` or :class:`wsgiref.util.FileWrapper` default of 8192.
"""
if bulk_size is None:
return FileWrapper(None).blksize
return bulk_size