Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
except:
logger.warning("Failed to save results to cache. If needed, please save them manually.")
if config.cache_fail_hard:
six.reraise(*sys.exc_info())
return value # As a last resort, return value object (which could be mutated by serialization).
return wrapped
class Cache(Duct):
"""
An abstract class providing the common API for all cache clients.
"""
DUCT_TYPE = Duct.Type.CACHE
@quirk_docs('_init', mro=True)
def __init__(self, **kwargs):
Duct.__init_with_kwargs__(self, kwargs)
self._init(**kwargs)
@abstractmethod
def _init(self):
pass
# Data insertion and retrieval
@require_connection
def set(self, key, value, namespace=None, serializer=None, metadata=None):
"""
Set the value of a key.
Args:
namespace (dict, None): The namespace to populate. If using from a
module you can pass `globals()`. If `None`, a new dictionary is
created, populated and then returned.
names (list, None): The names to include in the population. If
not specified then all names will be exported.
kinds (list, None): The kinds of ducts to include in the
population. If not specified, all kinds will be exported.
Returns:
dict: The populated namespace.
"""
if namespace is None:
namespace = {}
if kinds is not None:
kinds = [Duct.Type(kind) if not isinstance(kind, Duct.Type) else kind for kind in kinds]
for name, duct in self._registry.items():
if (kinds is None or duct.DUCT_TYPE in kinds) and (names is None or name in names):
namespace[name.split('/')[-1]] = duct
return namespace
Look up an existing registered `Duct` by name and (optionally) kind.
Args:
name (str): The name of the `Duct` instance.
kind (str, Duct.Type): The kind of `Duct` to which the lookup should
be restricted.
Returns:
`Duct`: The looked up `Duct` instance.
Raises:
DuctNotFound: If no `Duct` can be found for requested name and/or
type.
"""
if kind and not isinstance(kind, Duct.Type):
kind = Duct.Type(kind)
if name not in self._registry:
raise DuctNotFound(name)
duct = self._registry[name]
if kind and duct.DUCT_TYPE != kind:
raise DuctNotFound("Duct named '{}' exists, but is not of kind '{}'.".format(name, kind.value))
return duct
class RemoteClient(FileSystemClient):
"""
An abstract class providing the common API for all remote clients.
Attributes:
smartcard (dict): Mapping of smartcard names to system libraries
compatible with `ssh-add -s '' ...`.
"""
__doc_attrs = """
smartcard (dict): Mapping of smartcard names to system libraries
compatible with `ssh-add -s '' ...`.
"""
DUCT_TYPE = Duct.Type.REMOTE
DEFAULT_PORT = None
@quirk_docs('_init', mro=True)
def __init__(self, smartcards=None, **kwargs):
"""
Args:
smartcards (dict): Mapping of smartcard names to system libraries
compatible with `ssh-add -s '' ...`.
"""
self.smartcards = smartcards
self.__port_forwarding_register = PortForwardingRegister()
FileSystemClient.__init_with_kwargs__(self, kwargs, port=self.DEFAULT_PORT)
# Note: self._init is called by FileSystemClient constructor.
class DatabaseClient(Duct, MagicsProvider):
"""
An abstract class providing the common API for all database clients.
Note: `DatabaseClient` subclasses are callable, so that one can use
`DatabaseClient(...)` as a short-hand for `DatabaseClient.query(...)`.
Class Attributes:
DUCT_TYPE (`Duct.Type`): The type of `Duct` protocol implemented by this class.
DEFAULT_PORT (int): The default port for the database service (defined
by subclasses).
CURSOR_FORMATTERS (dict
from omniduct.duct import Duct
from omniduct.utils.decorators import require_connection
from omniduct.utils.magics import MagicsProvider, process_line_arguments
class FileSystemClient(Duct, MagicsProvider):
"""
An abstract class providing the common API for all filesystem clients.
Class Attributes:
DUCT_TYPE (`Duct.Type`): The type of `Duct` protocol implemented by this class.
DEFAULT_PORT (int): The default port for the filesystem service (defined
by subclasses).
"""
DUCT_TYPE = Duct.Type.FILESYSTEM
DEFAULT_PORT = None
@quirk_docs('_init', mro=True)
def __init__(self, cwd=None, home=None, read_only=False, global_writes=False, **kwargs):
"""
cwd (None, str): The path prefix to use as the current working directory
(if None, the user's home directory is used where that makes sense).
home (None, str): The path prefix to use as the current users' home
directory. If not specified, it will default to an implementation-
specific value (often '/').
read_only (bool): Whether the filesystem should only be able to perform
read operations.
global_writes (bool): Whether to allow writes outside of the user's home
folder.
**kwargs (dict): Additional keyword arguments to passed on to subclasses.
"""
- Ensures value of self.port is an integer (or None).
"""
# Import necessary classes lazily (to prevent dependency cycles)
from omniduct.registry import DuctRegistry
from omniduct.caches.base import Cache
from omniduct.remotes.base import RemoteClient
# Check registry is of an appropriate type (if present)
assert (self.registry is None) or isinstance(self.registry, DuctRegistry), "Provided registry is not an instance of `omniduct.registry.DuctRegistry`."
# If registry is present, lookup remotes and caches if necessary
if self.registry is not None:
if self.remote and isinstance(self.remote, six.string_types):
self.__prepreparation_values['remote'] = self.remote
self.remote = self.registry.lookup(self.remote, kind=Duct.Type.REMOTE)
if self.cache and isinstance(self.cache, six.string_types):
self.__prepreparation_values['cache'] = self.cache
self.cache = self.registry.lookup(self.cache, kind=Duct.Type.CACHE)
# Check if remote and cache objects are of correct type (if present)
assert (self.remote is None) or isinstance(self.remote, RemoteClient), "Provided remote is not an instance of `omniduct.remotes.base.RemoteClient`."
assert (self.cache is None) or isinstance(self.cache, Cache), "Provided cache is not an instance of `omniduct.caches.base.Cache`."
# Replace prepared fields with the result of calling existing values
# with a reference to `self`.
for field in self.prepared_fields:
value = getattr(self, field)
if hasattr(value, '__call__'):
self.__prepreparation_values[field] = value
setattr(self, field, value(self))
# Import necessary classes lazily (to prevent dependency cycles)
from omniduct.registry import DuctRegistry
from omniduct.caches.base import Cache
from omniduct.remotes.base import RemoteClient
# Check registry is of an appropriate type (if present)
assert (self.registry is None) or isinstance(self.registry, DuctRegistry), "Provided registry is not an instance of `omniduct.registry.DuctRegistry`."
# If registry is present, lookup remotes and caches if necessary
if self.registry is not None:
if self.remote and isinstance(self.remote, six.string_types):
self.__prepreparation_values['remote'] = self.remote
self.remote = self.registry.lookup(self.remote, kind=Duct.Type.REMOTE)
if self.cache and isinstance(self.cache, six.string_types):
self.__prepreparation_values['cache'] = self.cache
self.cache = self.registry.lookup(self.cache, kind=Duct.Type.CACHE)
# Check if remote and cache objects are of correct type (if present)
assert (self.remote is None) or isinstance(self.remote, RemoteClient), "Provided remote is not an instance of `omniduct.remotes.base.RemoteClient`."
assert (self.cache is None) or isinstance(self.cache, Cache), "Provided cache is not an instance of `omniduct.caches.base.Cache`."
# Replace prepared fields with the result of calling existing values
# with a reference to `self`.
for field in self.prepared_fields:
value = getattr(self, field)
if hasattr(value, '__call__'):
self.__prepreparation_values[field] = value
setattr(self, field, value(self))
if isinstance(self._host, (list, tuple)):
if '_host' not in self.__prepreparation_values:
self.__prepreparation_values['_host'] = self._host