Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _getResByPath(self, path):
"""Return _VirtualResource object for a path.
path is expected to be
categoryType/category/name
for example:
'ou/development/spec.doc'
"""
assert not "\\" in path
catType, cat, resName, contentSpec = util.saveSplit(path.strip("/"), "/", 3)
# print "_getResByPath('%s'): catType=%s, cat=%s, resName=%s" % (path, catType, cat, resName)
if catType and catType not in _alllowedCategories:
raise DAVError(HTTP_NOT_FOUND, "Unknown category type '%s'" % catType)
if catType == _realCategory:
# Accessing /by_key/ or /by_key//content
for data in _resourceData:
if data["key"] == cat:
if resName == "content":
return VirtualResFile(data)
return VirtualResource(data)
raise DAVError(HTTP_NOT_FOUND, "Unknown key '%s'" % cat)
elif resName:
# Accessing /// or ////??
res = None
for data in _resourceData:
if data.get(catType) == cat and data["title"] == resName:
res = data
def handle_delete(self):
"""Change semantic of DELETE to remove resource tags."""
# DELETE is only supported for the '/by_tag/' collection
if "/by_tag/" not in self.path:
raise DAVError(HTTP_FORBIDDEN)
# path must be '/by_tag//'
catType, tag, _rest = util.save_split(self.path.strip("/"), "/", 2)
assert catType == "by_tag"
assert tag in self.data["tags"]
self.data["tags"].remove(tag)
return True # OK
"""Set or remove property value.
See DAVResource.set_property_value()
"""
if value is None:
# We can never remove properties
raise DAVError(HTTP_FORBIDDEN)
if name == "{virtres:}tags":
# value is of type etree.Element
self.data["tags"] = value.text.split(",")
elif name == "{virtres:}description":
# value is of type etree.Element
self.data["description"] = value.text
elif name in VirtualResource._supportedProps:
# Supported property, but read-only
raise DAVError(
HTTP_FORBIDDEN, err_condition=PRECONDITION_CODE_ProtectedProperty
)
else:
# Unsupported property
raise DAVError(HTTP_FORBIDDEN)
# Write OK
return
def set_property_value(self, name, value, dry_run=False):
"""Set or remove property value.
See DAVResource.set_property_value()
"""
raise DAVError(HTTP_FORBIDDEN)
for data in _resourceData:
if data["key"] == cat:
if resName == "content":
return VirtualResFile(data)
return VirtualResource(data)
raise DAVError(HTTP_NOT_FOUND, "Unknown key '%s'" % cat)
elif resName:
# Accessing /// or ////??
res = None
for data in _resourceData:
if data.get(catType) == cat and data["title"] == resName:
res = data
break
if not res:
raise DAVError(HTTP_NOT_FOUND)
if contentSpec == _contentSegment:
# Accessing ////content
return VirtualResFile(res)
elif contentSpec:
# Accessing ////??
raise DAVError(HTTP_NOT_FOUND)
# Accessing ///
return VirtualResource(res)
elif cat:
# Accessing /catType/cat: return list of matching names
resList = [ data["title"] for data in _resourceData if data.get(catType) == cat ]
return VirtualResCollection(resList)
elif catType:
# Accessing /catType/: return all possible values for this catType
# ETag matched. If it's a GET request and we don't have an
# conflicting If-Modified header, we return NOT_MODIFIED
if (
environ["REQUEST_METHOD"] in ("GET", "HEAD")
and not ifModifiedSinceFailed
):
raise DAVError(HTTP_NOT_MODIFIED, "If-None-Match header failed")
raise DAVError(
HTTP_PRECONDITION_FAILED, "If-None-Match header condition failed"
)
ignoreIfModifiedSince = True
if "HTTP_IF_UNMODIFIED_SINCE" in environ and dav_res.support_modified():
ifunmodtime = parse_time_string(environ["HTTP_IF_UNMODIFIED_SINCE"])
if ifunmodtime and ifunmodtime < last_modified:
raise DAVError(
HTTP_PRECONDITION_FAILED, "If-Unmodified-Since header condition failed"
)
if ifModifiedSinceFailed and not ignoreIfModifiedSince:
raise DAVError(HTTP_NOT_MODIFIED, "If-Modified-Since header condition failed")
return
elif cat:
# Accessing /catType/cat: return list of matching names
resList = [ data["title"] for data in _resourceData if data.get(catType) == cat ]
return VirtualResCollection(resList)
elif catType:
# Accessing /catType/: return all possible values for this catType
if catType in _browsableCategories:
resList = []
for data in _resourceData:
if not data[catType] in resList:
resList.append(data[catType])
return VirtualResCollection(resList)
# Known category type, but not browsable (e.g. 'by_key')
raise DAVError(HTTP_FORBIDDEN)
# Accessing /: return list of categories
return VirtualResCollection(_browsableCategories)
def move_recursive(self, dest_path):
"""See DAVResource.move_recursive() """
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
fpDest = self.provider._loc_to_file_path(dest_path, self.environ)
assert not util.is_equal_or_child_uri(self.path, dest_path)
assert not os.path.exists(fpDest)
_logger.debug("move_recursive({}, {})".format(self._file_path, fpDest))
shutil.move(self._file_path, fpDest)
# (Live properties are copied by copy2 or copystat)
# Move dead properties
if self.provider.prop_manager:
destRes = self.provider.get_resource_inst(dest_path, self.environ)
self.provider.prop_manager.move_properties(
self.get_ref_url(),
destRes.get_ref_url(),
with_children=True,
environ=self.environ,
)
if content_length < 0:
raise DAVError(HTTP_BAD_REQUEST, "Negative content-length.")
except ValueError:
raise DAVError(HTTP_BAD_REQUEST, "content-length is not numeric.")
if content_length == 0:
requestbody = ""
else:
requestbody = environ["wsgi.input"].read(content_length)
environ["wsgidav.all_input_read"] = 1
if requestbody == "":
if allow_empty:
return None
else:
raise DAVError(HTTP_BAD_REQUEST, "Body must not be empty.")
try:
rootEL = etree.fromstring(requestbody)
except Exception as e:
raise DAVError(HTTP_BAD_REQUEST, "Invalid XML format.", src_exception=e)
# If dumps of the body are desired, then this is the place to do it pretty:
if environ.get("wsgidav.dump_request_body"):
_logger.info(
"{} XML request body:\n{}".format(
environ["REQUEST_METHOD"],
compat.to_native(xml_to_bytes(rootEL, pretty_print=True)),
)
)
environ["wsgidav.dump_request_body"] = False
def delete(self):
"""Remove this resource or collection (recursive).
See DAVResource.delete()
"""
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
os.unlink(self._filePath)
self.removeAllProperties(True)
self.removeAllLocks(True)