Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert join_uri("/a/b", "c", "d") == "/a/b/c/d"
assert join_uri("a/b", "c", "d") == "a/b/c/d"
assert join_uri("/", "c") == "/c"
assert join_uri("", "c") == "/c"
assert not is_child_uri("/a/b", "/a/")
assert not is_child_uri("/a/b", "/a/b")
assert not is_child_uri("/a/b", "/a/b/")
assert not is_child_uri("/a/b", "/a/bc")
assert not is_child_uri("/a/b", "/a/bc/")
assert is_child_uri("/a/b", "/a/b/c")
assert is_child_uri("/a/b", "/a/b/c")
assert not is_equal_or_child_uri("/a/b", "/a/")
assert is_equal_or_child_uri("/a/b", "/a/b")
assert is_equal_or_child_uri("/a/b", "/a/b/")
assert not is_equal_or_child_uri("/a/b", "/a/bc")
assert not is_equal_or_child_uri("/a/b", "/a/bc/")
assert is_equal_or_child_uri("/a/b", "/a/b/c")
assert is_equal_or_child_uri("/a/b", "/a/b/c")
assert lstripstr("/dav/a/b", "/dav") == "/a/b"
assert lstripstr("/dav/a/b", "/DAV") == "/dav/a/b"
assert lstripstr("/dav/a/b", "/DAV", True) == "/a/b"
assert pop_path("/a/b/c") == ("a", "/b/c")
assert pop_path("/a/b/") == ("a", "/b/")
assert pop_path("/a/") == ("a", "/")
assert pop_path("/a") == ("a", "/")
assert pop_path("/") == ("", "")
assert pop_path("") == ("", "")
assert join_uri("/", "c") == "/c"
assert join_uri("", "c") == "/c"
assert not is_child_uri("/a/b", "/a/")
assert not is_child_uri("/a/b", "/a/b")
assert not is_child_uri("/a/b", "/a/b/")
assert not is_child_uri("/a/b", "/a/bc")
assert not is_child_uri("/a/b", "/a/bc/")
assert is_child_uri("/a/b", "/a/b/c")
assert is_child_uri("/a/b", "/a/b/c")
assert not is_equal_or_child_uri("/a/b", "/a/")
assert is_equal_or_child_uri("/a/b", "/a/b")
assert is_equal_or_child_uri("/a/b", "/a/b/")
assert not is_equal_or_child_uri("/a/b", "/a/bc")
assert not is_equal_or_child_uri("/a/b", "/a/bc/")
assert is_equal_or_child_uri("/a/b", "/a/b/c")
assert is_equal_or_child_uri("/a/b", "/a/b/c")
assert lstripstr("/dav/a/b", "/dav") == "/a/b"
assert lstripstr("/dav/a/b", "/DAV") == "/dav/a/b"
assert lstripstr("/dav/a/b", "/DAV", True) == "/a/b"
assert pop_path("/a/b/c") == ("a", "/b/c")
assert pop_path("/a/b/") == ("a", "/b/")
assert pop_path("/a/") == ("a", "/")
assert pop_path("/a") == ("a", "/")
assert pop_path("/") == ("", "")
assert pop_path("") == ("", "")
self.assertEqual(shift_path("", "/a/b/c"), ("a", "/a", "/b/c"))
self.assertEqual(shift_path("/a", "/b/c"), ("b", "/a/b", "/c"))
def move_properties(self, src_url, dest_url, with_children, environ=None):
_logger.debug(
"move_properties({}, {}, {})".format(src_url, dest_url, with_children)
)
self._lock.acquire_write()
try:
if __debug__ and self._verbose >= 4:
self._check()
if not self._loaded:
self._lazy_open()
if with_children:
# Move src_url\*
for url in list(self._dict.keys()):
if util.is_equal_or_child_uri(src_url, url):
d = url.replace(src_url, dest_url)
self._dict[d] = self._dict[url]
del self._dict[url]
elif src_url in self._dict:
# Move src_url only
self._dict[dest_url] = self._dict[src_url]
del self._dict[src_url]
self._sync()
if __debug__ and self._verbose >= 4:
self._check("after move")
finally:
self._lock.release()
def copy_move_single(self, dest_path, is_move):
"""See DAVResource.copy_move_single() """
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
fpDest = self.provider._loc_to_file_path(dest_path, self.environ)
assert not util.is_equal_or_child_uri(self.path, dest_path)
# Copy file (overwrite, if exists)
shutil.copy2(self._file_path, fpDest)
# (Live properties are copied by copy2 or copystat)
# Copy dead properties
propMan = self.provider.prop_manager
if propMan:
destRes = self.provider.get_resource_inst(dest_path, self.environ)
if is_move:
propMan.move_properties(
self.get_ref_url(),
destRes.get_ref_url(),
with_children=False,
environ=self.environ,
)
else:
propMan.copy_properties(
def move_recursive(self, dest_path):
"""See DAVResource.move_recursive() """
if self.provider.readonly:
raise DAVError(HTTP_FORBIDDEN)
fpDest = self.provider._loc_to_file_path(dest_path, self.environ)
assert not util.is_equal_or_child_uri(self.path, dest_path)
assert not os.path.exists(fpDest)
_logger.debug("move_recursive({}, {})".format(self._file_path, fpDest))
shutil.move(self._file_path, fpDest)
# (Live properties are copied by copy2 or copystat)
# Move dead properties
if self.provider.prop_manager:
destRes = self.provider.get_resource_inst(dest_path, self.environ)
self.provider.prop_manager.move_properties(
self.get_ref_url(),
destRes.get_ref_url(),
with_children=True,
environ=self.environ,
)
# http://www.webdav.org/specs/rfc4918.html#rfc.section.9.8.5
error_list.append((sRes.get_href(), as_DAVError(e)))
# MOVE: Remove source tree (bottom-up)
if is_move:
reverseSrcList = srcList[:]
reverseSrcList.reverse()
_logger.debug("Delete after move, ignoreDict={}".format(ignoreDict))
for sRes in reverseSrcList:
# Non-collections have already been removed in the copy loop.
if not sRes.is_collection:
continue
# Skip collections that contain errors (unmoved resources)
childError = False
for ignorePath in ignoreDict.keys():
if util.is_equal_or_child_uri(sRes.path, ignorePath):
childError = True
break
if childError:
_logger.debug(
"Delete after move: skipping '{}', because of child error".format(
sRes.path
)
)
continue
try:
_logger.debug("Remove collection after move: {}".format(sRes))
sRes.delete()
except Exception as e:
_debug_exception(e)
error_list.append((srcRes.get_href(), as_DAVError(e)))
# http://www.webdav.org/specs/rfc4918.html#rfc.section.7.4
if is_move:
self._check_write_permission(srcRes, "infinity", environ)
# Cannot remove members from locked-0 collections
if srcParentRes:
self._check_write_permission(srcParentRes, "0", environ)
# Cannot create or new members in locked-0 collections
if not destExists:
self._check_write_permission(destParentRes, "0", environ)
# If target exists, it must not be locked
self._check_write_permission(destRes, "infinity", environ)
if srcPath == destPath:
self._fail(HTTP_FORBIDDEN, "Cannot copy/move source onto itself")
elif util.is_equal_or_child_uri(srcPath, destPath):
self._fail(HTTP_FORBIDDEN, "Cannot copy/move source below itself")
if destExists and environ["HTTP_OVERWRITE"] != "T":
self._fail(
HTTP_PRECONDITION_FAILED,
"Destination already exists and Overwrite is set to false",
)
# --- Let provider handle the request natively ------------------------
# Errors in copy/move; [ (, ), ... ]
error_list = []
success_code = HTTP_CREATED
if destExists:
success_code = HTTP_NO_CONTENT
# We get here, if
# - the provider does not support recursive moves
# - this is a copy request
# In this case we would probably not win too much by a native provider
# implementation, since we had to handle single child errors anyway.
# - the source tree is partially locked
# We would have to pass this information to the native provider.
# Hidden paths (paths of failed copy/moves) {: True, ...}
ignoreDict = {}
for sRes in srcList:
# Skip this resource, if there was a failure copying a parent
parentError = False
for ignorePath in ignoreDict.keys():
if util.is_equal_or_child_uri(ignorePath, sRes.path):
parentError = True
break
if parentError:
_logger.debug(
"Copy: skipping '{}', because of parent error".format(sRes.path)
)
continue
try:
relUrl = sRes.path[srcRootLen:]
dPath = destPath + relUrl
self._evaluate_if_headers(sRes, environ)
# We copy resources and their properties top-down.
# Collections are simply created (without members), for