Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class Method(object):
@property
def name(self):
return type(self).__name__.upper()[:-6]
async def handle(self, request, environ, app):
raise NotImplementedError(self.handle)
def allow(self, request):
"""Is this method allowed considering the specified request?"""
return True
class DeleteMethod(Method):
async def handle(self, request, environ, app):
unused_href, path, r = app._get_resource_from_environ(request, environ)
if r is None:
return _send_not_found(request)
container_path, item_name = posixpath.split(path.rstrip('/'))
pr = app.backend.get_resource(container_path)
if pr is None:
return _send_not_found(request)
current_etag = await r.get_etag()
if_match = request.headers.get('If-Match', None)
if if_match is not None and not etag_matches(if_match, current_etag):
return Response(status=412, reason='Precondition Failed')
pr.delete_member(item_name, current_etag)
return Response(status=204, reason='No Content')
unused_href, path, r = app._get_resource_from_environ(request, environ)
if r is None:
return _send_not_found(request)
container_path, item_name = posixpath.split(path.rstrip('/'))
pr = app.backend.get_resource(container_path)
if pr is None:
return _send_not_found(request)
current_etag = await r.get_etag()
if_match = request.headers.get('If-Match', None)
if if_match is not None and not etag_matches(if_match, current_etag):
return Response(status=412, reason='Precondition Failed')
pr.delete_member(item_name, current_etag)
return Response(status=204, reason='No Content')
class PostMethod(Method):
async def handle(self, request, environ, app):
# see RFC5995
new_contents = await _readBody(request)
unused_href, path, r = app._get_resource_from_environ(request, environ)
if r is None:
return _send_not_found(request)
if COLLECTION_RESOURCE_TYPE not in r.resource_types:
return _send_method_not_allowed(
app._get_allowed_methods(request))
content_type, params = parse_type(request.content_type)
try:
(name, etag) = r.create_member(None, new_contents, content_type)
except PreconditionFailure as e:
return _send_simple_dav_error(
request, '412 Precondition Failed',
elif requested.tag == '{DAV:}prop':
propstat = get_properties(
href, resource, app.properties, environ, requested)
elif requested.tag == '{DAV:}propname':
propstat = get_property_names(
href, resource, app.properties, environ, requested)
else:
raise BadRequestError(
'Expected prop/allprop/propname tag, got ' + requested.tag)
yield Status(href, '200 OK', propstat=[s async for s in propstat])
# By my reading of the WebDAV RFC, it should be legal to return
# '200 OK' here if Depth=0, but the RFC is not super clear and
# some clients don't seem to like it and prefer a 207 instead.
class ProppatchMethod(Method):
@multistatus
async def handle(self, request, environ, app):
href, unused_path, resource = app._get_resource_from_environ(
request, environ)
if resource is None:
yield Status(request.url, '404 Not Found')
return
et = await _readXmlBody(
request, '{DAV:}propertyupdate', strict=app.strict)
propstat = []
for el in et:
if el.tag not in ('{DAV:}set', '{DAV:}remove'):
raise BadRequestError('Unknown tag %s in propertyupdate'
% el.tag)
propstat.extend(apply_modify_prop(el, href, resource,
# RFC7231 requires that if there is no response body,
# Content-Length: 0 must be sent. This implies that there is
# content (albeit empty), and thus a 204 is not a valid reply.
# Thunderbird also fails if a 204 is sent rather than a 200.
return Response(status=200, reason='OK', headers=headers + [
('Content-Length', '0')])
class HeadMethod(Method):
async def handle(self, request, environ, app):
return await _do_get(request, environ, app, send_body=False)
class GetMethod(Method):
async def handle(self, request, environ, app):
return await _do_get(request, environ, app, send_body=True)
async def _do_get(request, environ, app, send_body):
unused_href, unused_path, r = app._get_resource_from_environ(
request, environ)
if r is None:
return _send_not_found(request)
accept_content_types = parse_accept_header(
request.headers.get('Accept', '*/*'))
accept_content_languages = parse_accept_header(
request.headers.get('Accept-Languages', '*'))
(
name, new_contents, content_type)
except PreconditionFailure as e:
return _send_simple_dav_error(
request, '412 Precondition Failed',
error=ET.Element(e.precondition),
description=e.description)
except InsufficientStorage:
return Response(status=507, reason='Insufficient Storage')
except ResourceLocked:
return Response(status=423, reason='Resource Locked')
return Response(
status=201, reason='Created', headers=[
('ETag', new_etag)])
class ReportMethod(Method):
async def handle(self, request, environ, app):
# See https://tools.ietf.org/html/rfc3253, section 3.6
base_href, unused_path, r = app._get_resource_from_environ(
request, environ)
if r is None:
return _send_not_found(request)
depth = request.headers.get("Depth", "0")
et = await _readXmlBody(request, None, strict=app.strict)
try:
reporter = app.reporters[et.tag]
except KeyError:
logging.warning('Client requested unknown REPORT %s', et.tag)
return _send_simple_dav_error(
request,
'403 Forbidden', error=ET.Element('{DAV:}supported-report'),
if r is None:
return _send_not_found(request)
dav_features = app._get_dav_features(r)
headers.append(('DAV', ', '.join(dav_features)))
allowed_methods = app._get_allowed_methods(request)
headers.append(('Allow', ', '.join(allowed_methods)))
# RFC7231 requires that if there is no response body,
# Content-Length: 0 must be sent. This implies that there is
# content (albeit empty), and thus a 204 is not a valid reply.
# Thunderbird also fails if a 204 is sent rather than a 200.
return Response(status=200, reason='OK', headers=headers + [
('Content-Length', '0')])
class HeadMethod(Method):
async def handle(self, request, environ, app):
return await _do_get(request, environ, app, send_body=False)
class GetMethod(Method):
async def handle(self, request, environ, app):
return await _do_get(request, environ, app, send_body=True)
async def _do_get(request, environ, app, send_body):
unused_href, unused_path, r = app._get_resource_from_environ(
request, environ)
if r is None:
return _send_not_found(request)
'403 Forbidden', error=ET.Element('{DAV:}supported-report'),
description=('Unknown report %s.' % et.tag)
)
if not reporter.supported_on(r):
return _send_simple_dav_error(
request,
'403 Forbidden', error=ET.Element('{DAV:}supported-report'),
description=('Report %s not supported on resource.' % et.tag)
)
return await reporter.report(
environ, et, functools.partial(
_get_resources_by_hrefs, app.backend, environ),
app.properties, base_href, r, depth)
class PropfindMethod(Method):
@multistatus
async def handle(self, request, environ, app):
base_href, unused_path, base_resource = (
app._get_resource_from_environ(request, environ))
if base_resource is None:
yield Status(request.url, '404 Not Found')
return
# Default depth is infinity, per RFC2518
depth = request.headers.get("Depth", "infinity")
if not request.can_read_body:
requested = None
else:
et = await _readXmlBody(
request, '{DAV:}propfind', strict=app.strict)
try:
propstat = []
for el in et:
if el.tag != '{DAV:}set':
raise BadRequestError('Unknown tag %s in mkcol' % el.tag)
propstat.extend(apply_modify_prop(el, href, resource,
app.properties))
ret = ET.Element('{DAV:}mkcol-response')
for propstat_el in propstat_as_xml(propstat):
ret.append(propstat_el)
return _send_xml_response(
'201 Created', ret, DEFAULT_ENCODING)
else:
return Response(status=201, reason='Created')
class OptionsMethod(Method):
async def handle(self, request, environ, app):
headers = []
if request.raw_path != '*':
unused_href, unused_path, r = (
app._get_resource_from_environ(request, environ))
if r is None:
return _send_not_found(request)
dav_features = app._get_dav_features(r)
headers.append(('DAV', ', '.join(dav_features)))
allowed_methods = app._get_allowed_methods(request)
headers.append(('Allow', ', '.join(allowed_methods)))
# RFC7231 requires that if there is no response body,
# Content-Length: 0 must be sent. This implies that there is
# content (albeit empty), and thus a 204 is not a valid reply.
return _send_simple_dav_error(
request, '412 Precondition Failed',
error=ET.Element(e.precondition),
description=e.description)
except InsufficientStorage:
return Response(status=507, reason='Insufficient Storage')
except ResourceLocked:
return Response(status=423, reason='Resource Locked')
href = (
environ['SCRIPT_NAME'] +
urllib.parse.urljoin(ensure_trailing_slash(path), name)
)
return Response(headers={'Location': href})
class PutMethod(Method):
async def handle(self, request, environ, app):
new_contents = await _readBody(request)
unused_href, path, r = app._get_resource_from_environ(request, environ)
if r is not None:
current_etag = await r.get_etag()
else:
current_etag = None
if_match = request.headers.get('If-Match', None)
if if_match is not None and not etag_matches(if_match, current_etag):
return Response(status='412 Precondition Failed')
if_none_match = request.headers.get('If-None-Match', None)
if if_none_match and etag_matches(if_none_match, current_etag):
return Response(status='412 Precondition Failed')
if r is not None:
# Item already exists; update it
assert end.tzinfo
ret = ICalendar()
ret['VERSION'] = '2.0'
ret['PRODID'] = PRODID
fb = FreeBusy()
fb['DTSTAMP'] = vDDDTypes(tzify(datetime.datetime.now()))
fb['DTSTART'] = vDDDTypes(start)
fb['DTEND'] = vDDDTypes(end)
fb['FREEBUSY'] = [item async for item in iter_freebusy(
webdav.traverse_resource(base_resource, base_href, depth),
start, end, tzify)]
ret.add_component(fb)
return webdav.Response(status='200 OK', body=[ret.to_ical()])
class MkcalendarMethod(webdav.Method):
async def handle(self, request, environ, app):
content_type = request.content_type
base_content_type, params = webdav.parse_type(content_type)
if base_content_type not in (
'text/xml', 'application/xml', None, 'text/plain',
'application/octet-stream',
):
raise webdav.UnsupportedMediaType(content_type)
href, path, resource = app._get_resource_from_environ(request, environ)
if resource is not None:
return webdav._send_simple_dav_error(
request,
'403 Forbidden',
error=ET.Element('{DAV:}resource-must-be-null'),
description=('Something already exists at %r' % path))