Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_resolve_operator():
op = wily.operators.resolve_operator("cyclomatic")
assert op == wily.operators.OPERATOR_CYCLOMATIC
def test_resolve_bad_operator():
with pytest.raises(ValueError):
wily.operators.resolve_operator("banana")
metrics = [(metric.split(".")[0], resolve_metric(metric)) for metric in metrics]
results = []
# Build a set of operators
with multiprocessing.Pool(processes=len(operators)) as pool:
operator_exec_out = pool.starmap(
run_operator, [(operator, None, config, targets) for operator in operators]
)
data = {}
for operator_name, result in operator_exec_out:
data[operator_name] = result
# Write a summary table
extra = []
for operator, metric in metrics:
if detail and resolve_operator(operator).level == OperatorLevel.Object:
for file in files:
try:
extra.extend(
[
f"{file}:{k}"
for k in data[operator][file]["detailed"].keys()
if k != metric.name
and isinstance(data[operator][file]["detailed"][k], dict)
]
)
except KeyError:
logger.debug(f"File {file} not in cache")
logger.debug("Cache follows -- ")
logger.debug(data[operator])
files.extend(extra)
logger.debug(files)
rev = resolve_archiver(state.default_archiver).cls(config).find(revision)
logger.debug(f"Resolved {revision} to {rev.key} ({rev.message})")
try:
target_revision = state.index[state.default_archiver][rev.key]
except KeyError:
logger.error(
f"Revision {revision} is not in the cache, make sure you have run wily build."
)
exit(1)
logger.info(
f"Comparing current with {format_revision(target_revision.revision.key)} by {target_revision.revision.author_name} on {format_date(target_revision.revision.date)}."
)
# Convert the list of metrics to a list of metric instances
operators = {resolve_operator(metric.split(".")[0]) for metric in metrics}
metrics = [(metric.split(".")[0], resolve_metric(metric)) for metric in metrics]
results = []
# Build a set of operators
with multiprocessing.Pool(processes=len(operators)) as pool:
operator_exec_out = pool.starmap(
run_operator, [(operator, None, config, targets) for operator in operators]
)
data = {}
for operator_name, result in operator_exec_out:
data[operator_name] = result
# Write a summary table
extra = []
for operator, metric in metrics:
if detail and resolve_operator(operator).level == OperatorLevel.Object:
continue
result[missing] = prev_stats["operator_data"][
operator_name
][missing]
# Aggregate metrics across all root paths using the aggregate function in the metric
for root in roots:
# find all matching entries recursively
aggregates = [
path
for path in result.keys()
if root in pathlib.Path(path).parents
]
result[str(root)] = {"total": {}}
# aggregate values
for metric in resolve_operator(operator_name).cls.metrics:
func = metric.aggregate
values = [
result[aggregate]["total"][metric.name]
for aggregate in aggregates
if aggregate in result
and metric.name in result[aggregate]["total"]
]
if len(values) > 0:
result[str(root)]["total"][metric.name] = func(values)
prev_indices = set(result.keys())
prev_roots = roots
stats["operator_data"][operator_name] = result
bar.next()
prev_stats = stats
:return: Return the list of default metrics in this index
:rtype: ``list`` of ``str``
"""
archivers = list_archivers(config)
default_metrics = []
for archiver in archivers:
index = get_archiver_index(config, archiver)
if len(index) == 0:
logger.warning("No records found in the index, no metrics available")
return []
operators = index[0]["operators"]
for operator in operators:
o = resolve_operator(operator)
if o.cls.default_metric_index is not None:
metric = o.cls.metrics[o.cls.default_metric_index]
default_metrics.append("{0}.{1}".format(o.cls.name, metric.name))
return default_metrics