Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class Method(object):
@property
def name(self):
return type(self).__name__.upper()[:-6]
async def handle(self, request, environ, app):
raise NotImplementedError(self.handle)
def allow(self, request):
"""Is this method allowed considering the specified request?"""
return True
class DeleteMethod(Method):
async def handle(self, request, environ, app):
unused_href, path, r = app._get_resource_from_environ(request, environ)
if r is None:
return _send_not_found(request)
container_path, item_name = posixpath.split(path.rstrip('/'))
pr = app.backend.get_resource(container_path)
if pr is None:
return _send_not_found(request)
current_etag = await r.get_etag()
if_match = request.headers.get('If-Match', None)
if if_match is not None and not etag_matches(if_match, current_etag):
return Response(status=412, reason='Precondition Failed')
pr.delete_member(item_name, current_etag)
return Response(status=204, reason='No Content')
unused_href, path, r = app._get_resource_from_environ(request, environ)
if r is None:
return _send_not_found(request)
container_path, item_name = posixpath.split(path.rstrip('/'))
pr = app.backend.get_resource(container_path)
if pr is None:
return _send_not_found(request)
current_etag = await r.get_etag()
if_match = request.headers.get('If-Match', None)
if if_match is not None and not etag_matches(if_match, current_etag):
return Response(status=412, reason='Precondition Failed')
pr.delete_member(item_name, current_etag)
return Response(status=204, reason='No Content')
class PostMethod(Method):
async def handle(self, request, environ, app):
# see RFC5995
new_contents = await _readBody(request)
unused_href, path, r = app._get_resource_from_environ(request, environ)
if r is None:
return _send_not_found(request)
if COLLECTION_RESOURCE_TYPE not in r.resource_types:
return _send_method_not_allowed(
app._get_allowed_methods(request))
content_type, params = parse_type(request.content_type)
try:
(name, etag) = r.create_member(None, new_contents, content_type)
except PreconditionFailure as e:
return _send_simple_dav_error(
request, '412 Precondition Failed',
def apply_filter(el, resource):
"""Compile a filter element into a Python function.
"""
if el is None or not list(el):
# Empty filter, let's not bother parsing
return lambda x: True
ab = addressbook_from_resource(resource)
if ab is None:
return False
test_name = el.get('test', 'anyof')
test = {'allof': all, 'anyof': any}[test_name]
return test(apply_prop_filter(subel, ab) for subel in el)
class AddressbookQueryReporter(webdav.Reporter):
name = '{%s}addressbook-query' % NAMESPACE
resource_type = ADDRESSBOOK_RESOURCE_TYPE
data_property = AddressDataProperty()
@webdav.multistatus
async def report(self, environ, body, resources_by_hrefs, properties,
base_href, base_resource, depth):
requested = None
filter_el = None
limit = None
for el in body:
if el.tag in ('{DAV:}prop', '{DAV:}allprop', '{DAV:}propname'):
requested = el
elif el.tag == ('{%s}filter' % NAMESPACE):
filter_el = el
if base_href is not None:
href = urllib.parse.urljoin(ensure_trailing_slash(base_href), href)
et.text = urllib.parse.quote(href)
return et
def read_href_element(et: ET.Element) -> Optional[str]:
if et.text is None:
return None
el = urllib.parse.unquote(et.text)
parsed_url = urllib.parse.urlsplit(el)
# TODO(jelmer): Check that the hostname matches the local hostname?
return parsed_url.path
class ExpandPropertyReporter(Reporter):
"""A expand-property reporter.
See https://tools.ietf.org/html/rfc3253, section 3.8
"""
name = '{DAV:}expand-property'
async def _populate(
self, prop_list: ET.Element,
resources_by_hrefs: Callable[
[Iterable[str]], List[Tuple[str, Resource]]],
properties: Dict[str, Property], href: str,
resource: Resource, environ) -> AsyncIterable[Status]:
"""Expand properties for a resource.
:param prop_list: DAV:property elements to retrieve and expand
collection.calendar_query(filter_fn),
collection.subcollections())
async for (href, resource) in webdav.traverse_resource(
base_resource, base_href, depth,
members=members):
# Ideally traverse_resource would only return the right things.
if getattr(resource, 'content_type', None) == 'text/calendar':
propstat = davcommon.get_properties_with_data(
self.data_property, href, resource, properties, environ,
requested)
yield webdav.Status(
href, '200 OK', propstat=[s async for s in propstat])
class CalendarColorProperty(webdav.Property):
"""calendar-color property
This contains a HTML #RRGGBB color code, as CDATA.
"""
name = '{http://apple.com/ns/ical/}calendar-color'
resource_type = (CALENDAR_RESOURCE_TYPE, SUBSCRIPTION_RESOURCE_TYPE)
async def get_value(self, href, resource, el, environ):
el.text = resource.get_calendar_color()
async def set_value(self, href, resource, el):
resource.set_calendar_color(el.text)
class SupportedCalendarComponentSetProperty(webdav.Property):
class MaxAttachmentSizeProperty(webdav.Property):
"""max-attachment-size property.
https://tools.ietf.org/id/draft-ietf-calext-caldav-attachments-03.html#rfc.section.6.2
"""
name = '{%s}max-attachment-size' % NAMESPACE
resource_type = CALENDAR_RESOURCE_TYPE
in_allprops = False
live = True
async def get_value(self, href, resource, el, environ):
el.text = str(resource.get_max_attachment_size())
class ManagedAttachmentsServerURLProperty(webdav.Property):
"""managed-attachments-server-URL property.
https://tools.ietf.org/id/draft-ietf-calext-caldav-attachments-03.html#rfc.section.6.1
"""
name = '{%s}managed-attachments-server-URL' % NAMESPACE
in_allprops = False
async def get_value(self, base_href, resource, el, environ):
# The RFC specifies that this property can be set on a calendar home
# collection.
# However, there is no matching resource type and we don't want to
# force all resources to implement it. So we just check whether the
# attribute is present.
fn = getattr(resource, 'get_managed_attachments_server_url', None)
if fn is None:
name = '{urn:ietf:params:xml:ns:caldav}calendar-timezone'
resource_type = (CALENDAR_RESOURCE_TYPE,
SCHEDULE_INBOX_RESOURCE_TYPE)
in_allprops = False
async def get_value(self, href, resource, el, environ):
el.text = resource.get_calendar_timezone()
async def set_value(self, href, resource, el):
if el is not None:
resource.set_calendar_timezone(el.text)
else:
resource.set_calendar_timezone(None)
class MinDateTimeProperty(webdav.Property):
"""min-date-time property.
See https://tools.ietf.org/html/rfc4791, section 5.2.6
"""
name = '{urn:ietf:params:xml:ns:caldav}min-date-time'
resource_type = (CALENDAR_RESOURCE_TYPE,
SCHEDULE_INBOX_RESOURCE_TYPE,
SCHEDULE_OUTBOX_RESOURCE_TYPE)
in_allprops = False
live = True
async def get_value(self, href, resource, el, environ):
el.text = resource.get_min_date_time()
"""getctag property
"""
name = '{DAV:}getctag'
class AppleGetCTagProperty(GetCTagProperty):
"""getctag property
"""
name = '{http://calendarserver.org/ns/}getctag'
class RefreshRateProperty(Property):
"""refreshrate property.
(no public documentation, but contains an ical-style frequency indicator)
"""
name = '{http://calendarserver.org/ns/}refreshrate'
resource_type = COLLECTION_RESOURCE_TYPE
in_allprops = False
async def get_value(self, href, resource, el, environ):
el.text = resource.get_refreshrate()
def set_value(self, href, resource, el):
resource.set_refreshrate(el.text)
name = '{urn:ietf:params:xml:ns:caldav}supported-calendar-data'
resource_type = (CALENDAR_RESOURCE_TYPE,
SCHEDULE_INBOX_RESOURCE_TYPE,
SCHEDULE_OUTBOX_RESOURCE_TYPE)
in_allprops = False
async def get_value(self, href, resource, el, environ):
for (content_type, version) in (
resource.get_supported_calendar_data_types()):
subel = ET.SubElement(
el, '{urn:ietf:params:xml:ns:caldav}calendar-data')
subel.set('content-type', content_type)
subel.set('version', version)
class CalendarTimezoneProperty(webdav.Property):
"""calendar-timezone property.
See https://tools.ietf.org/html/rfc4791, section 5.2.2
"""
name = '{urn:ietf:params:xml:ns:caldav}calendar-timezone'
resource_type = (CALENDAR_RESOURCE_TYPE,
SCHEDULE_INBOX_RESOURCE_TYPE)
in_allprops = False
async def get_value(self, href, resource, el, environ):
el.text = resource.get_calendar_timezone()
async def set_value(self, href, resource, el):
if el is not None:
resource.set_calendar_timezone(el.text)
"""
name = '{%s}supported-address-data' % NAMESPACE
resource_type = ADDRESSBOOK_RESOURCE_TYPE
in_allprops = False
live = True
async def get_value(self, href, resource, el, environ):
for (content_type,
version) in resource.get_supported_address_data_types():
subel = ET.SubElement(el, '{%s}content-type' % NAMESPACE)
subel.set('content-type', content_type)
subel.set('version', version)
class MaxResourceSizeProperty(webdav.Property):
"""Provides the max-resource-size property.
See https://tools.ietf.org/html/rfc6352, section 6.2.3.
"""
name = '{%s}max-resource-size' % NAMESPACE
resource_type = ADDRESSBOOK_RESOURCE_TYPE
in_allprops = False
live = True
async def get_value(self, href, resource, el, environ):
el.text = str(resource.get_max_resource_size())
class MaxImageSizeProperty(webdav.Property):
"""Provides the max-image-size property.