Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def build(config: WilyConfig, archiver: Archiver, operators: List[Operator]):
"""
Build the history given a archiver and collection of operators.
:param config: The wily configuration
:param _archiver: The archiver to use
:param operators: The list of operators to execute
"""
try:
logger.debug(f"Using {archiver.name} archiver module")
_archiver = archiver.cls(config)
revisions = _archiver.revisions(config.path, config.max_revisions)
except InvalidGitRepositoryError:
# TODO: This logic shouldn't really be here (SoC)
logger.info(f"Defaulting back to the filesystem archiver, not a valid git repo")
_archiver = FilesystemArchiver(config)
revisions = _archiver.revisions(config.path, config.max_revisions)
except Exception as e:
if hasattr(e, "message"):
logger.error(f"Failed to setup archiver: '{e.message}'")
else:
logger.error(f"Failed to setup archiver: '{type(e)} - {e}'")
exit(1)
state = State(config, archiver=_archiver)
# Check for existence of cache, else provision
state.ensure_exists()
index = state.get_index(_archiver)
# remove existing revisions from the list
config.operators = operators.strip().split(",")
if archiver:
logger.debug(f"Fixing archiver to {archiver}")
config.archiver = archiver
if targets:
logger.debug(f"Fixing targets to {targets}")
config.targets = targets
build(
config=config,
archiver=resolve_archiver(config.archiver),
operators=resolve_operators(config.operators),
)
logger.info(
"Completed building wily history, run `wily report ` or `wily index` to see more."
)
state = State(config)
if not revision_index:
target_revision = state.index[state.default_archiver].last_revision
else:
rev = resolve_archiver(state.default_archiver).cls(config).find(revision_index)
logger.debug(f"Resolved {revision_index} to {rev.key} ({rev.message})")
try:
target_revision = state.index[state.default_archiver][rev.key]
except KeyError:
logger.error(
f"Revision {revision_index} is not in the cache, make sure you have run wily build."
)
exit(1)
logger.info(
f"-----------Rank for {metric.description} for {format_revision(target_revision.revision.key)} by {target_revision.revision.author_name} on {format_date(target_revision.revision.date)}.------------"
)
if path is None:
files = target_revision.get_paths(config, state.default_archiver, operator)
logger.debug(f"Analysing {files}")
else:
# Resolve target paths when the cli has specified --path
if config.path != DEFAULT_PATH:
targets = [str(Path(config.path) / Path(path))]
else:
targets = [path]
# Expand directories to paths
files = [
os.path.relpath(fn, config.path)
logger.debug(f"Targeting - {files}")
if not revision:
target_revision = state.index[state.default_archiver].last_revision
else:
rev = resolve_archiver(state.default_archiver).cls(config).find(revision)
logger.debug(f"Resolved {revision} to {rev.key} ({rev.message})")
try:
target_revision = state.index[state.default_archiver][rev.key]
except KeyError:
logger.error(
f"Revision {revision} is not in the cache, make sure you have run wily build."
)
exit(1)
logger.info(
f"Comparing current with {format_revision(target_revision.revision.key)} by {target_revision.revision.author_name} on {format_date(target_revision.revision.date)}."
)
# Convert the list of metrics to a list of metric instances
operators = {resolve_operator(metric.split(".")[0]) for metric in metrics}
metrics = [(metric.split(".")[0], resolve_metric(metric)) for metric in metrics]
results = []
# Build a set of operators
with multiprocessing.Pool(processes=len(operators)) as pool:
operator_exec_out = pool.starmap(
run_operator, [(operator, None, config, targets) for operator in operators]
)
data = {}
for operator_name, result in operator_exec_out:
data[operator_name] = result
def clean(ctx, yes):
"""Clear the .wily/ folder."""
config = ctx.obj["CONFIG"]
if not exists(config):
logger.info("Wily cache does not exist, nothing to remove.")
exit(0)
if not yes:
p = input("Are you sure you want to delete wily cache? [y/N]")
if p.lower() != "y":
exit(0)
from wily.cache import clean
clean(config)
def index(config, include_message=False):
"""
Show information about the cache and runtime.
:param config: The wily configuration
:type config: :namedtuple:`wily.config.WilyConfig`
:param include_message: Include revision messages
:type include_message: ``bool``
"""
state = State(config=config)
logger.debug("Running show command")
logger.info("--------Configuration---------")
logger.info(f"Path: {config.path}")
logger.info(f"Archiver: {config.archiver}")
logger.info(f"Operators: {config.operators}")
logger.info("")
logger.info("-----------History------------")
data = []
for archiver in state.archivers:
for rev in state.index[archiver].revisions[::-1]:
if include_message:
data.append(
(
format_revision(rev.revision.key),
rev.revision.author_name,
rev.revision.message[:MAX_MESSAGE_WIDTH],
format_date(rev.revision.date),
)
Show information about the cache and runtime.
:param config: The wily configuration
:type config: :namedtuple:`wily.config.WilyConfig`
:param include_message: Include revision messages
:type include_message: ``bool``
"""
state = State(config=config)
logger.debug("Running show command")
logger.info("--------Configuration---------")
logger.info(f"Path: {config.path}")
logger.info(f"Archiver: {config.archiver}")
logger.info(f"Operators: {config.operators}")
logger.info("")
logger.info("-----------History------------")
data = []
for archiver in state.archivers:
for rev in state.index[archiver].revisions:
if include_message:
data.append(
(
format_revision(rev.revision.key),
rev.revision.author_name,
rev.revision.message[:MAX_MESSAGE_WIDTH],
format_date(rev.revision.date),
)
)
else:
data.append(
(
def index(config, include_message=False):
"""
Show information about the cache and runtime.
:param config: The wily configuration
:type config: :namedtuple:`wily.config.WilyConfig`
:param include_message: Include revision messages
:type include_message: ``bool``
"""
state = State(config=config)
logger.debug("Running show command")
logger.info("--------Configuration---------")
logger.info(f"Path: {config.path}")
logger.info(f"Archiver: {config.archiver}")
logger.info(f"Operators: {config.operators}")
logger.info("")
logger.info("-----------History------------")
data = []
for archiver in state.archivers:
for rev in state.index[archiver].revisions:
if include_message:
data.append(
(
format_revision(rev.revision.key),
rev.revision.author_name,
rev.revision.message[:MAX_MESSAGE_WIDTH],
format_date(rev.revision.date),
table_content += f"{element}"
table_content += ""
report_template = report_template.safe_substitute(
headers=table_headers, content=table_content
)
with report_output.open("w") as output:
output.write(report_template)
try:
copytree(str(templates_dir / "css"), str(report_path / "css"))
except FileExistsError:
pass
logger.info(f"wily report was saved to {report_path}")
else:
print(
tabulate.tabulate(
headers=headers, tabular_data=data[::-1], tablefmt=console_format
)