Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send_basic_auth_response(self, environ, start_response):
realm = self.domain_controller.get_domain_realm(environ["PATH_INFO"], environ)
_logger.debug("401 Not Authorized for realm '{}' (basic)".format(realm))
wwwauthheaders = 'Basic realm="' + realm + '"'
body = compat.to_bytes(self.error_message_401)
start_response(
"401 Not Authorized",
[
("WWW-Authenticate", wwwauthheaders),
("Content-Type", "text/html"),
("Content-Length", str(len(body))),
("Date", util.get_rfc1123_time()),
],
)
return [body]
if name == "{DAV:}creationdate" and self.get_creation_date() is not None:
# Note: uses RFC3339 format (ISO 8601)
return util.get_rfc3339_time(self.get_creation_date())
elif name == "{DAV:}getcontenttype" and self.get_content_type() is not None:
return self.get_content_type()
elif name == "{DAV:}resourcetype":
if self.is_collection:
resourcetypeEL = etree.Element(name)
etree.SubElement(resourcetypeEL, "{DAV:}collection")
return resourcetypeEL
return ""
elif (
name == "{DAV:}getlastmodified" and self.get_last_modified() is not None
):
# Note: uses RFC1123 format
return util.get_rfc1123_time(self.get_last_modified())
elif (
name == "{DAV:}getcontentlength"
and self.get_content_length() is not None
):
# Note: must be a numeric string
return str(self.get_content_length())
elif name == "{DAV:}getetag" and self.get_etag() is not None:
return self.get_etag()
elif name == "{DAV:}displayname" and self.get_display_name() is not None:
return self.get_display_name()
# Unsupported, no persistence available, or property not found
raise DAVError(HTTP_NOT_FOUND)
# Dead property
pm = self.provider.prop_manager
# obtain_content_ranges supports more than one range in case the above
# behaviour changes in future
(range_start, range_end, range_length) = list_ranges[0]
else:
(range_start, range_end, range_length) = (0, filesize - 1, filesize)
# Content Processing
mimetype = res.get_content_type() # provider.get_content_type(path)
response_headers = []
if res.support_content_length():
# Content-length must be of type string
response_headers.append(("Content-Length", str(range_length)))
if res.support_modified():
response_headers.append(
("Last-Modified", util.get_rfc1123_time(last_modified))
)
response_headers.append(("Content-Type", mimetype))
response_headers.append(("Date", util.get_rfc1123_time()))
if res.support_etag():
response_headers.append(("ETag", '"{}"'.format(entitytag)))
if res.support_ranges():
response_headers.append(("Accept-Ranges", "bytes"))
if "response_headers" in environ["wsgidav.config"]:
customHeaders = environ["wsgidav.config"]["response_headers"]
for header, value in customHeaders:
response_headers.append((header, value))
res.finalize_headers(environ, response_headers)
# Skip ignore patterns
ignore = False
for pat in ignore_patterns:
if fnmatch(entry["display_name"], pat):
ignored_list.append(entry["display_name"])
# _logger.debug("Ignore {}".format(entry["display_name"]))
ignore = True
break
if ignore:
continue
#
last_modified = entry.get("last_modified")
if last_modified is None:
entry["str_modified"] = ""
else:
entry["str_modified"] = util.get_rfc1123_time(last_modified)
entry["str_size"] = "-"
if not entry.get("is_collection"):
content_length = entry.get("content_length")
if content_length is not None:
entry["str_size"] = util.byte_number_string(content_length)
rows.append(entry)
if ignored_list:
_logger.debug(
"Dir browser ignored {} entries: {}".format(
len(ignored_list), ignored_list
)
)
# sort
"""
@see http://www.webdav.org/specs/rfc4918.html#HEADER_DAV
"""
path = environ["PATH_INFO"]
provider = self._davProvider
res = provider.get_resource_inst(path, environ)
dav_compliance_level = "1,2"
if provider is None or provider.is_readonly() or provider.lock_manager is None:
dav_compliance_level = "1"
headers = [
("Content-Type", "text/html"),
("Content-Length", "0"),
("DAV", dav_compliance_level),
("Date", util.get_rfc1123_time()),
]
if path == "/":
path = "*" # Hotfix for WinXP
if path == "*":
# Answer HTTP 'OPTIONS' method on server-level.
# From RFC 2616
# If the Request-URI is an asterisk ("*"), the OPTIONS request is
# intended to apply to the server in general rather than to a specific
# resource. Since a server's communication options typically depend on
# the resource, the "*" request is only useful as a "ping" or "no-op"
# type of method; it does nothing beyond allowing the client to test the
# capabilities of the server. For example, this can be used to test a
# proxy for HTTP/1.1 compliance (or lack thereof).
start_response("200 OK", headers)
)
_logger.debug(
"401 Not Authorized for realm '{}' (digest): {}".format(
realm, wwwauthheaders
)
)
body = compat.to_bytes(self.error_message_401)
start_response(
"401 Not Authorized",
[
("WWW-Authenticate", wwwauthheaders),
("Content-Type", "text/html"),
("Content-Length", str(len(body))),
("Date", util.get_rfc1123_time()),
],
)
return [body]
else:
(range_start, range_end, range_length) = (0, filesize - 1, filesize)
# Content Processing
mimetype = res.get_content_type() # provider.get_content_type(path)
response_headers = []
if res.support_content_length():
# Content-length must be of type string
response_headers.append(("Content-Length", str(range_length)))
if res.support_modified():
response_headers.append(
("Last-Modified", util.get_rfc1123_time(last_modified))
)
response_headers.append(("Content-Type", mimetype))
response_headers.append(("Date", util.get_rfc1123_time()))
if res.support_etag():
response_headers.append(("ETag", '"{}"'.format(entitytag)))
if res.support_ranges():
response_headers.append(("Accept-Ranges", "bytes"))
if "response_headers" in environ["wsgidav.config"]:
customHeaders = environ["wsgidav.config"]["response_headers"]
for header, value in customHeaders:
response_headers.append((header, value))
res.finalize_headers(environ, response_headers)
if ispartialranges:
# response_headers.append(("Content-Ranges", "bytes " + str(range_start) + "-" +
# str(range_end) + "/" + str(range_length)))
("Date", util.get_rfc1123_time()),
],
)
return [res]
context = self._get_context(environ, dav_res)
res = self.template.render(**context)
res = compat.to_bytes(res)
start_response(
"200 OK",
[
("Content-Type", "text/html"),
("Content-Length", str(len(res))),
("Cache-Control", "private"),
("Date", util.get_rfc1123_time()),
],
)
return [res]
return self.next_app(environ, start_response)