Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _init(self, path, fs=None):
"""
path (str): The top-level path of the cache in the filesystem.
fs (FileSystemClient, str): The filesystem client to use as the
datastore of this cache. If not specified, this will default to the
local filesystem using `LocalFsClient`. If specified as a string,
and connected to a `DuctRegistry`, upon first use an attempt will be
made to look up a `FileSystemClient` instance in the registry by
this name.
"""
self.fs = fs or LocalFsClient()
self.path = path
# Currently config is not used, but will be in future versions
self._config = None
self.connection_fields += ('fs',)
copied instead of copying the entire folder. If `dest` is
otherwise a directory, an exception will be raised.
overwrite (bool): `True` if the contents of any existing file by the
same name should be overwritten, `False` otherwise.
fs (FileSystemClient): The FileSystemClient into which the nominated
file/folder `source` should be downloaded. If not specified,
defaults to the local filesystem.
SSHClient Quirks:
This method is overloaded so that remote-to-local downloads can be
handled specially using `scp`. Downloads to any non-local filesystem
are handled using the standard implementation.
"""
from ..filesystems.local import LocalFsClient
if fs is None or isinstance(fs, LocalFsClient):
logger.info('Copying file to local...')
dest = dest or posixpath.basename(source)
cmd = (
"scp -r -o ControlPath={socket} {login}:'{remote_file}' '{local_file}'".format(
socket=self._socket_path,
login=self._login_info,
remote_file=dest.replace('"', r'\"'),
local_file=source.replace('"', r'\"'), # quote escaped for bash
)
)
proc = run_in_subprocess(cmd, check_output=True)
logger.info(proc.stderr or 'Success')
else:
return super(RemoteClient, self).download(source, dest, overwrite, fs)
def get_host_and_set_namenodes(duct, cluster, conf_path):
conf_parser = CdhHdfsConfParser(duct.remote or LocalFsClient(), conf_path=conf_path)
duct.namenodes = conf_parser.namenodes(cluster)
return random.choice(duct.namenodes)
and corresponds to a directory, the contents of source will be
copied instead of copying the entire folder.
dest (str): The destination path on this filesystem. If not
specified, the file/folder is uploaded into the default path,
usually one's home folder, on this filesystem. If `dest` ends
with '/' then file will be copied into destination folder, and
will throw an error if path does not resolve to a directory.
overwrite (bool): `True` if the contents of any existing file by the
same name should be overwritten, `False` otherwise.
fs (FileSystemClient): The FileSystemClient from which to load the
file/folder at `source`. If not specified, defaults to the local
filesystem.
"""
if fs is None:
from .local import LocalFsClient
fs = LocalFsClient()
return fs.download(source, dest, overwrite, self)
Args:
file (str, file-like-object): The path of the file containing the
query statement to be executed against the database, or an open
file-like resource.
fs (None, FileSystemClient): The filesystem wihin which the
nominated file should be found. If `None`, the local filesystem
will be used.
**kwargs (dict): Extra keyword arguments to pass on to
`DatabaseClient.execute`.
Returns:
DBAPI2 cursor: A DBAPI2 compatible cursor instance.
"""
close_later = False
if isinstance(file, str):
file = (fs or LocalFsClient()).open(file, 'r')
close_later = True
try:
return self.execute(file.read(), **kwargs)
finally:
if close_later:
file.close()
usually one's home folder. If `dest` ends with '/',
and corresponds to a directory, the contents of source will be
copied instead of copying the entire folder. If `dest` is
otherwise a directory, an exception will be raised.
overwrite (bool): `True` if the contents of any existing file by the
same name should be overwritten, `False` otherwise.
fs (FileSystemClient): The FileSystemClient into which the nominated
file/folder `source` should be downloaded. If not specified,
defaults to the local filesystem.
"""
# TODO: Consider integration with `odo` for optimised data transfers.
if fs is None:
from .local import LocalFsClient
fs = LocalFsClient()
source = self._path(source)
dest = fs._path(dest or self.path_basename(source))
if dest.endswith(fs.path_separator):
assert fs.isdir(dest), "No such directory `{}`".format(dest)
if not source.endswith(self.path_separator):
dest = fs.path_join(fs._path(dest), self.path_basename(source))
# A mapping of source to dest paths on the respective filesystems
# In format: (source, dest, isdir?)
targets = []
if self.isdir(source):
target_prefix = (
source if source.endswith(self.path_separator) else source + self.path_separator
Args:
statement (str): The statement to be executed against the database.
file (str, file-like-object): The filename where the data should be
written, or an open file-like resource.
format (str): The format to be used ('csv' by default). Format
options can be passed via `**kwargs`.
fs (None, FileSystemClient): The filesystem wihin which the
nominated file should be found. If `None`, the local filesystem
will be used.
**kwargs: Additional keyword arguments to pass onto
`DatabaseClient.stream`.
"""
close_later = False
if isinstance(file, str):
file = (fs or LocalFsClient()).open(file, 'w')
close_later = True
try:
file.writelines(self.stream(statement, format=format, **kwargs))
finally:
if close_later:
file.close()
def _prepare(self):
assert self.remote is None, "LocalFsClient cannot be used in conjunction with a remote client."
super(LocalFsClient, self)._prepare()
with '/' then file will be copied into destination folder, and
will throw an error if path does not resolve to a directory.
overwrite (bool): `True` if the contents of any existing file by the
same name should be overwritten, `False` otherwise.
fs (FileSystemClient): The FileSystemClient from which to load the
file/folder at `source`. If not specified, defaults to the local
filesystem.
SSHClient Quirks:
This method is overloaded so that local-to-remote uploads can be
handled specially using `scp`. Uploads to any non-local filesystem
are handled using the standard implementation.
"""
from ..filesystems.local import LocalFsClient
if fs is None or isinstance(fs, LocalFsClient):
logger.info('Copying file from local...')
dest = dest or posixpath.basename(source)
cmd = (
"scp -r -o ControlPath={socket} '{local_file}' {login}:'{remote_file}'".format(
socket=self._socket_path,
local_file=source.replace('"', r'\"'), # quote escaped for bash
login=self._login_info,
remote_file=dest.replace('"', r'\"'),
)
)
proc = run_in_subprocess(cmd, check_output=True)
logger.info(proc.stderr or 'Success')
else:
return super(RemoteClient, self).upload(source, dest, overwrite, fs)
Args:
file (str, file-like-object): The path of the file containing the
query statement to be executed against the database, or an open
file-like resource.
fs (None, FileSystemClient): The filesystem wihin which the
nominated file should be found. If `None`, the local filesystem
will be used.
**kwargs (dict): Extra keyword arguments to pass on to
`DatabaseClient.query`.
Returns:
object: The results of the query formatted as nominated.
"""
close_later = False
if isinstance(file, str):
file = (fs or LocalFsClient()).open(file, 'r')
close_later = True
try:
return self.query(file.read(), **kwargs)
finally:
if close_later:
file.close()