Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not patlist[0]:
basedir = b'/'
patlist = patlist[1:]
else:
basedir = None
names = []
try:
yield from self._glob(fs, basedir, patlist, decode, names)
if names:
result.extend(names)
else:
raise SFTPError(FX_NO_SUCH_FILE, 'No matches found')
except (OSError, SFTPError) as exc:
# pylint: disable=attribute-defined-outside-init
exc.srcpath = pattern
if error_handler:
error_handler(exc)
else:
raise exc
return result
def _copy(self, srcfs, dstfs, srcpath, dstpath, preserve,
recurse, follow_symlinks, error_handler):
"""Copy a file, directory, or symbolic link"""
if follow_symlinks:
srcattrs = yield from srcfs.stat(srcpath)
else:
srcattrs = yield from srcfs.lstat(srcpath)
try:
if stat.S_ISDIR(srcattrs.permissions):
if not recurse:
raise SFTPError(FX_FAILURE, '%s is a directory' %
srcpath.decode('utf-8', errors='replace'))
if not (yield from dstfs.isdir(dstpath)):
yield from dstfs.mkdir(dstpath)
names = yield from srcfs.listdir(srcpath)
for name in names:
if name in (b'.', b'..'):
continue
srcfile = srcfs.compose_path(name, parent=srcpath)
dstfile = dstfs.compose_path(name, parent=dstpath)
yield from self._copy(srcfs, dstfs, srcfile,
dstfile, preserve, recurse,
handle = packet.get_string()
attrs = SFTPAttrs.decode(packet)
packet.check_end()
file_obj = self._file_handles.get(handle)
if file_obj:
result = self._server.fsetstat(file_obj, attrs)
if asyncio.iscoroutine(result):
result = yield from result
return result
else:
raise SFTPError(FX_FAILURE, 'Invalid file handle')
def encode(self, path):
"""Encode path name using configured path encoding
This method has no effect if the path is already bytes.
"""
if isinstance(path, str):
if self._path_encoding:
path = path.encode(self._path_encoding, self._path_errors)
else:
raise SFTPError(FX_BAD_MESSAGE, 'Path must be bytes when '
'encoding is not set')
return path
def fsync(self, handle):
"""Make an SFTP fsync request"""
if self._supports_fsync:
return (yield from self._make_request(b'fsync@openssh.com',
String(handle)))
else:
raise SFTPError(FX_OP_UNSUPPORTED, 'fsync not supported')
def _process_packet(self, pkttype, pktid, packet):
"""Process incoming SFTP requests"""
# pylint: disable=broad-except
try:
if pkttype == FXP_EXTENDED:
pkttype = packet.get_string()
handler = self._packet_handlers.get(pkttype)
if not handler:
raise SFTPError(FX_OP_UNSUPPORTED,
'Unsupported request type: %s' % pkttype)
return_type = self._return_types.get(pkttype, FXP_STATUS)
result = yield from handler(self, packet)
if return_type == FXP_STATUS:
result = UInt32(FX_OK) + String('') + String('')
elif return_type in (FXP_HANDLE, FXP_DATA):
result = String(result)
elif return_type == FXP_NAME:
result = (UInt32(len(result)) +
b''.join(name.encode() for name in result))
else:
if isinstance(result, os.stat_result):
result = SFTPAttrs.from_local(result)
elif isinstance(result, os.statvfs_result):
new path is removed before the rename.
:param bytes oldpath:
The path of the file, directory, or link to rename
:param bytes newpath:
The new name for this file, directory, or link
:raises: :exc:`SFTPError` to return an error to the client
"""
oldpath = self.map_path(oldpath)
newpath = self.map_path(newpath)
if os.path.exists(newpath):
raise SFTPError(FX_FAILURE, 'File already exists')
return os.rename(oldpath, newpath)
handle = packet.get_string()
packet.check_end()
file_obj = self._file_handles.pop(handle, None)
if file_obj:
result = self._server.close(file_obj)
if asyncio.iscoroutine(result):
yield from result
return
if self._dir_handles.pop(handle, None) is not None:
return
raise SFTPError(FX_FAILURE, 'Invalid file handle')
path = path.encode('utf-8')
command = (b'scp ' + (b'-f ' if source else b'-t ') +
(b'-d ' if must_be_dir else b'') +
(b'-p ' if preserve else b'') +
(b'-r ' if recurse else b'') + path)
conn.logger.get_child('sftp').info('Starting remote SCP, args: %s',
command[4:])
writer, reader, _ = await conn.open_session(command, encoding=None)
return reader, writer
class SCPError(SFTPError):
"""SCP error"""
def __init__(self, code, reason, path=None, fatal=False,
suppress_send=False, lang=DEFAULT_LANG):
if isinstance(reason, bytes):
reason = reason.decode('utf-8', errors='replace')
if isinstance(path, bytes):
path = path.decode('utf-8', errors='replace')
if path:
reason = reason + ': ' + path
super().__init__(code, reason, lang)
self.fatal = fatal
self.suppress_send = suppress_send