Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if config.debug("plugins"):
print_warning(
"'register_plugin' function at %s: %s did not return a class."
% (path, modname))
else:
if config.debug("plugins"):
print_warning(
"no 'register_plugin' function at %s: %s"
% (path, modname))
# delete from sys.modules?
except Exception as e:
nameish = modname.split('.')[-1]
self.failed_plugins[nameish] = str(e)
if config.debug("plugins"):
import traceback
from rez.vendor.six.six import StringIO
out = StringIO()
traceback.print_exc(file=out)
print_debug(out.getvalue())
# load config
data, _ = _load_config_from_filepaths([os.path.join(path, "rezconfig")])
deep_update(self.config_data, data)
# load sourcefile that's been copied into package install payload
path = os.path.join(package.base, self.include_modules_subpath)
pathname = os.path.join(path, "%s-*.py" % name)
pathnames = glob(pathname)
if not pathnames:
return None
filepath = pathnames[0]
hash_str = filepath.rsplit('-', 1)[-1].split('.', 1)[0]
module = self.modules.get(hash_str)
if module is not None:
return module
if config.debug("file_loads"):
print_debug("Loading include sourcefile: %s" % filepath)
module = py23.load_module_from_file(name, filepath)
self.modules[hash_str] = module
return module
increase the speed of resource iteration.
"""
if child_resource_classes is not None:
child_resource_classes = [x for x in child_resource_classes
if x.has_ancestor(parent_resource.__class__)]
if not child_resource_classes:
return
for child_class in parent_resource.children():
if child_resource_classes and \
not any((child_class is x or x.has_ancestor(child_class))
for x in child_resource_classes):
continue
for child in child_class.iter_instances(parent_resource):
if not dicts_conflicting(variables or {}, child.variables):
if config.debug("resources"):
print_debug("%s%r" % (" " * (_depth + 1), child))
yield child
for grand_child in _iter_resources(child,
child_resource_classes,
variables,
_depth + 1):
yield grand_child
message = "Needs pip%s, but found '%s' for Python '%s'"
if version_text is None or not py_exe:
is_valid = False
if log_invalid:
print_debug(message, PIP_SPECIFIER, version_text, py_exe)
elif PackagingVersion(version_text) not in PIP_SPECIFIER:
is_valid = False
if log_invalid:
print_warning(message, PIP_SPECIFIER, version_text, py_exe)
return is_valid
_verbose = config.debug("package_release")
def _log(msg):
if _verbose:
print_debug(msg)
def _iter_filtered_resources(parent_resource, resource_classes, variables):
debug = config.debug("resources")
if debug:
keys = [x.key for x in resource_classes]
print_debug("\nSEARCHING RESOURCES:\nClasses: %r\nVariables: %r"
% (keys, variables))
print_debug("PARENT RESOURCE: %r" % parent_resource)
for child in _iter_resources(parent_resource, resource_classes, variables):
if isinstance(child, tuple(resource_classes)) \
and is_dict_subset(variables or {}, child.variables):
if debug:
print_debug("RESOURCE MATCH: %r" % child)
yield child
Returns:
dict: the metadata
"""
if loader is None:
loader = get_file_loader(filepath)
elif isinstance(loader, basestring):
loader = metadata_loaders[loader]
timings.start("resources.load_file")
try:
with open(filepath, 'r') as f:
doc = loader(f, filepath)
finally:
timings.end("resources.load_file")
if config.debug("resources"):
print_debug("loaded resource file: %s" % filepath)
return doc
env_ = None
env = conf.get("env")
if env:
env_ = os.environ.copy()
env_.update(env)
# If we have, ie, a list, and format_pretty is True, it will be printed
# as "1 2 3" instead of "[1, 2, 3]"
formatter.__dict__['format_pretty'] = conf.get(
"pretty_args", True)
args = conf.get("args", [])
args = [formatter.format(x) for x in args]
args = [expandvars(x, environ=env_) for x in args]
if self.settings.print_commands or config.debug("package_release"):
from subprocess import list2cmdline
toks = [program] + args
msgs = []
msgs.append("running command: %s" % list2cmdline(toks))
if env:
for key, value in env.iteritems():
msgs.append(" with: %s=%s" % (key, value))
if self.settings.print_commands:
print('\n'.join(msgs))
else:
for msg in msgs:
print_debug(msg)
if not self.execute_command(cmd_name=program,
match = re.search("alias (?P.*?)=(?P.*)", cmd)
key = match.groupdict()['key'].strip()
value = match.groupdict()['value'].strip()
if (value.startswith('"') and value.endswith('"')) or \
(value.startswith("'") and value.endswith("'")):
value = value[1:-1]
loc.append("alias('%s', %s)" % (key, _encode(value)))
else:
# assume we can execute this as a straight command
loc.append("command(%s)" % _encode(cmd))
except:
# if anything goes wrong, just fall back to bash command
loc.append("command(%s)" % _encode(cmd))
rex_code = '\n'.join(loc)
if config.debug("old_commands"):
br = '-' * 80
msg = textwrap.dedent(
"""
%s
OLD COMMANDS:
%s
NEW COMMANDS:
%s
%s
""") % (br, '\n'.join(commands), rex_code, br)
print_debug(msg)
return rex_code
def publish_message(self, data):
if not self.settings.host:
print_error("Did not publish message, host is not specified")
return
routing_key = self.settings.exchange_routing_key
print("Publishing AMQP message on %s..." % routing_key)
publish_message(
host=self.settings.host,
amqp_settings=self.settings,
routing_key=routing_key,
data=data
)
if config.debug("package_release"):
print_debug("Published message: %s" % (data))