Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# line must only have precending spaces
if not re.match(r"^\s*$", before):
return line
splitted = after.strip().split(" ", 1)
command = splitted[0]
args = []
if len(splitted) > 1:
args = splitted[1]
try:
func = DriverManager(DRIVER_NS, command)
func.propagate_map_exceptions = True
except NoMatches:
self.logger.debug("Command not found %s", command, exc_info=True)
return line
output = func(lambda ext: ext.plugin(args, line=line, formatter=self))
if output is None:
output = []
elif isinstance(output, str):
output = output.split(os.linesep)
output = os.linesep.join([before + item for item in output])
output = self.plugin.chain("line_post_hook", output, formatter=self)
return output
def map(self, *args, **kwargs):
try:
return super().map(*args, **kwargs)
except NoMatches:
return []
def _init_plugins(self, extensions):
super(DriverManager, self)._init_plugins(extensions)
if not self.extensions:
name = self._names[0]
raise NoMatches('No %r driver found, looking for %r' %
(self.namespace, name))
if len(self.extensions) > 1:
discovered_drivers = ','.join(e.entry_point_target
for e in self.extensions)
raise MultipleMatches('Multiple %r drivers found: %s' %
(self.namespace, discovered_drivers))
The first argument to func(), 'ext', is the
:class:`~stevedore.extension.Extension` instance.
Exceptions raised from within func() are propagated up and
processing stopped if self.propagate_map_exceptions is True,
otherwise they are logged and ignored.
:param func: Callable to invoke for each extension.
:param args: Variable arguments to pass to func()
:param kwds: Keyword arguments to pass to func()
:returns: List of values returned from func()
"""
if not self.extensions:
# FIXME: Use a more specific exception class here.
raise NoMatches('No %s extensions found' % self.namespace)
response = []
for e in self.extensions:
self._invoke_one_plugin(response.append, func, e, args, kwds)
return response
The first argument to func(), 'ext', is the
:class:`~stevedore.extension.Extension` instance.
Exceptions raised from within func() are propagated up and
processing stopped if self.propagate_map_exceptions is True,
otherwise they are logged and ignored.
:param filter_func: Callable to test each extension.
:param func: Callable to invoke for each extension.
:param args: Variable arguments to pass to func()
:param kwds: Keyword arguments to pass to func()
:returns: List of values returned from func()
"""
if not self.extensions:
# FIXME: Use a more specific exception class here.
raise NoMatches('No %s extensions found' % self.namespace)
response = []
for e in self.extensions:
if filter_func(e, *args, **kwds):
self._invoke_one_plugin(response.append, func, e, args, kwds)
return response