Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@specs.parameter('version_spec', yaqltypes.String(True))
def __init__(self, name, default_name=None, version_spec=None):
self.type = name(self.context).type
self.default_type = default_name(self.context) or self.type
self.version_spec = version_spec
@specs.parameter('handler', yaqltypes.String(nullable=True))
def unsubscribe_destruction(publisher, subscriber, handler=None):
publisher_this = publisher.real_this
subscriber_this = subscriber.real_this
if handler:
subscriber.type.find_single_method(handler)
dds = publisher_this.destruction_dependencies
dependency = GarbageCollector._find_dependency(
publisher_this, subscriber_this, handler)
if dependency:
dds.remove(dependency)
@specs.parameter('handler', yaqltypes.String(nullable=True))
def subscribe_destruction(publisher, subscriber, handler=None):
publisher_this = publisher.real_this
subscriber_this = subscriber.real_this
if handler:
subscriber.type.find_single_method(handler)
dependency = GarbageCollector._find_dependency(
publisher_this, subscriber_this, handler)
if not dependency:
dependency = {'subscriber': helpers.weak_ref(subscriber_this),
'handler': handler}
publisher_this.destruction_dependencies.append(dependency)
@specs.parameter('_Logger__message', yaqltypes.String())
@inject_format
def info(__self, __yaql_format_function, __message, *args, **kwargs):
__self._log(__self._underlying_logger.info,
__yaql_format_function, __message, args, kwargs)
@specs.parameter('value', yaqltypes.String())
@specs.extension_method
def base64encode(value):
return base64.encode_as_text(value)
@specs.parameter('string', yaqltypes.String())
@specs.parameter('format__', yaqltypes.String(True))
def datetime_from_string(string, format__=None):
if not format__:
result = parser.parse(string)
else:
result = DATETIME_TYPE.strptime(string, format__)
if not result.tzinfo:
return result.replace(tzinfo=UTCTZ)
return result
@specs.parameter('string', yaqltypes.String())
@specs.parameter('selector', yaqltypes.Lambda(with_context=True))
@specs.method
def search(context, regexp, string, selector=None):
res = regexp.search(string)
if res is None:
return None
if selector is None:
return res.group()
_publish_match(context, res)
return selector(context)
@specs.parameter('pattern', yaqltypes.String())
@specs.parameter('string', yaqltypes.String())
@specs.name('#operator_!~')
def not_matches_operator_string(string, pattern):
return re.search(pattern, string) is None
@specs.parameter('entry_point', yaqltypes.String())
def publish_blueprint(self, entry_point):
global archive_upload_lock
if self._check_blueprint_exists():
return
path = self._application_package.get_resource(entry_point)
with archive_upload_lock:
try:
self._client.blueprints.upload(
path, self._blueprint_id)
except cloudify_exceptions.CloudifyClientError as e:
if e.status_code != 409:
raise
@specs.parameter('value', yaqltypes.String())
@specs.extension_method
def decrypt_data(value):
options.set_defaults(oslo_cfg.CONF,
barbican_endpoint_type='internal')
manager = key_manager.API()
try:
context = castellan_utils.credential_factory(conf=cfg.CONF)
except castellan_exception.AuthTypeInvalidError as e:
LOG.exception(e)
LOG.error("Castellan must be correctly configured in order to use "
"decryptData()")
raise
try:
data = manager.get(context, value).get_encoded()
except castellan_exception.KeyManagerError as e:
LOG.exception(e)