Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
line = self.plugin.chain("line_pre_hook", line, formatter=self)
before, after = line.split("#", 1)
# line must only have precending spaces
if not re.match(r"^\s*$", before):
return line
splitted = after.strip().split(" ", 1)
command = splitted[0]
args = []
if len(splitted) > 1:
args = splitted[1]
try:
func = DriverManager(DRIVER_NS, command)
func.propagate_map_exceptions = True
except NoMatches:
self.logger.debug("Command not found %s", command, exc_info=True)
return line
output = func(lambda ext: ext.plugin(args, line=line, formatter=self))
if output is None:
output = []
elif isinstance(output, str):
output = output.split(os.linesep)
output = os.linesep.join([before + item for item in output])
output = self.plugin.chain("line_post_hook", output, formatter=self)
return output
:return: resp: Response instance
:type: resp: `api.common.Response`
"""
project_id = req._headers.get('X-Project-ID')
queue_name = req._body.get('queue_name')
options = req._body.get('options', {})
ttl = req._body.get('ttl', self._defaults.subscription_ttl)
LOG.debug(
u'Subscription create - queue: %(queue)s, project: %(project)s',
{'queue': queue_name,
'project': project_id})
try:
url = netutils.urlsplit(subscriber)
mgr = driver.DriverManager('zaqar.notification.tasks', url.scheme,
invoke_on_load=True)
req_data = req._env.copy()
mgr.driver.register(subscriber, options, ttl, project_id, req_data)
data = {'subscriber': subscriber,
'options': options,
'ttl': ttl}
self._validate.subscription_posting(data)
self._validate.queue_identification(queue_name, project_id)
if not self._queue_controller.exists(queue_name, project_id):
self._queue_controller.create(queue_name, project=project_id)
created = self._subscription_controller.create(queue_name,
subscriber,
data['ttl'],
data['options'],
project=project_id)
'"database", "file", ...')
default_dispatcher = 'database'
options = urlparse.parse_qs(parsed_url.query)
self.dispatcher_name = options.get('dispatcher',
[default_dispatcher])[-1]
self._sample_dispatcher = None
self._event_dispatcher = None
try:
self.sample_driver = driver.DriverManager(
'ceilometer.dispatcher.meter', self.dispatcher_name).driver
except stevedore.exception.NoMatches:
self.sample_driver = None
try:
self.event_driver = driver.DriverManager(
'ceilometer.dispatcher.event', self.dispatcher_name).driver
except stevedore.exception.NoMatches:
self.event_driver = None
def make_provider(name, request, url):
"""Returns an instance of :class:`mfr.core.provider.BaseProvider`
:param str name: The name of the provider to instantiate. (osf)
:param request:
:param dict url:
:rtype: :class:`mfr.core.provider.BaseProvider`
"""
try:
return driver.DriverManager(
namespace='mfr.providers',
name=name.lower(),
invoke_on_load=True,
invoke_args=(request, url, ),
).driver
except RuntimeError:
raise exceptions.MakeProviderError(
'"{}" is not a supported provider'.format(name.lower()),
namespace='mfr.providers',
name=name.lower(),
invoke_on_load=True,
invoke_args={
'request': request,
'url': url,
}
:param verify: The verification arguments to pass to requests. These
are of the same form as requests expects, so True or
False to verify (or not) against system certificates or
a path to a bundle or CA certs to check against.
(optional, defaults to True)
:param cert: A client certificate to pass to requests. These are of the
same form as requests expects. Either a single filename
containing both the certificate and key or a tuple
containing the path to the certificate then a path to the
key. (optional)
:param float timeout: A timeout to pass to requests. This should be a
numerical value indicating some amount
(or fraction) of seconds or 0 for no timeout.
(optional, defaults to 0)
"""
self.auth_plugin = driver.DriverManager('gringotts.client_auth_plugin',
auth_plugin,
invoke_on_load=True,
invoke_args=args,
invoke_kwds=kwargs)
self.auth_plugin = self.auth_plugin.driver
self.session = requests.Session()
self.verify = verify
self.cert = cert
self.timeout = None
if timeout is not None:
self.timeout = float(timeout)
def get_backend():
global _IMPL
if not _IMPL:
zun.conf.CONF.import_opt('backend',
'oslo_db.options', group='database')
_IMPL = driver.DriverManager("zun.database.migration_backend",
zun.conf.CONF.database.backend).driver
return _IMPL
def _get_connection(conf):
"""Load the configured engine and return an instance."""
engine_name = urlparse.urlparse(conf.database.connection).scheme
LOG.debug('looking for %s driver in %s', engine_name, DB_ENGINE_NAMESPACE)
mgr = driver.DriverManager(DB_ENGINE_NAMESPACE, engine_name)
return mgr.driver(conf)