Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if isinstance(commit_only, list):
for path_ in commit_only:
p = Path(path_)
if p.exists() or change_types.get(path_) == 'D':
self.repo.git.add(path_)
if not commit_only:
self.repo.git.add('--all')
if not commit_empty and not self.repo.index.diff('HEAD'):
if raise_if_empty:
raise errors.NothingToCommit()
return
if commit_message and not isinstance(commit_message, str):
raise errors.CommitMessageEmpty()
elif not commit_message:
argv = [os.path.basename(sys.argv[0])
] + [remove_credentials(arg) for arg in sys.argv[1:]]
commit_message = ' '.join(argv)
# Ignore pre-commit hooks since we have already done everything.
self.repo.index.commit(
commit_message,
committer=committer,
skip_hooks=True,
)
def fetch_template(source, ref='master', tempdir=None):
"""Fetch the template and checkout the relevant data.
:param source: url or full path of the templates repository
:param ref: reference for git checkout - branch, commit or tag
:param tempdir: temporary work directory path
:return: template manifest Path
"""
if tempdir is None:
tempdir = Path(tempfile.mkdtemp())
try:
# clone the repo locally without checking out files
template_repo = git.Repo.clone_from(source, tempdir, no_checkout=True)
except git.exc.GitCommandError as e:
raise errors.GitError(
'Cannot clone repo from {0}'.format(source)
) from e
try:
# fetch ref and set the HEAD
template_repo.remotes.origin.fetch(ref)
try:
template_repo.head.reset(template_repo.commit(ref))
except git.exc.BadName:
ref = 'origin/{0}'.format(ref)
template_repo.head.reset(template_repo.commit(ref))
git_repo = git.Git(str(tempdir))
except git.exc.GitCommandError as e:
raise errors.GitError(
'Cannot fetch and checkout reference {0}'.format(ref)
) from e
def name_validator(self, attribute, value):
"""Validate reference name."""
if not self.check_ref_format(value):
raise errors.ParameterError(
'The reference name "{0}" is not valid.'.format(value)
)
)
if auto_login and url.username and url.password:
try:
subprocess.run([
'docker',
'login',
url.hostname,
'-u',
url.username,
'--password-stdin',
],
check=True,
input=url.password.encode('utf-8'))
except subprocess.CalledProcessError:
raise errors.AuthenticationError(
'Check configuration of password or token in the registry URL'
)
return url
def validate_template_manifest(manifest):
"""Validate manifet content.
:param manifest: manifest file content
"""
if not isinstance(manifest, list):
raise errors.InvalidTemplateError((
'The repository doesn\'t contain a valid',
'"{0}" file'.format(TEMPLATE_MANIFEST)
))
for template in manifest:
if not isinstance(template, dict) or 'name' not in template:
raise errors.InvalidTemplateError((
'Every template listed in "{0}"',
' must have a name'.format(TEMPLATE_MANIFEST)
))
for attribute in ['folder', 'description']:
if attribute not in template:
raise errors.InvalidTemplateError((
'Template "{0}" doesn\'t have a {1} attribute'.format(
template['name'], attribute
)
))
return True
def ensure_untracked(self, path):
"""Ensure that path is not part of git untracked files."""
untracked = self.repo.untracked_files
for file_path in untracked:
is_parent = str(file_path).startswith(path)
is_equal = path == file_path
if is_parent or is_equal:
raise errors.DirtyRenkuDirectory(self.repo)
last_output_id = len(outputs)
for output, input, path in self.find_explicit_outputs(
last_output_id
):
outputs.append(output)
paths.append(path)
if input is not None:
if input.id not in inputs: # pragma: no cover
raise RuntimeError('Inconsistent input name.')
inputs[input.id] = input
if unmodified:
raise errors.UnmodifiedOutputs(repo, unmodified)
if not no_output and not paths:
raise errors.OutputsNotFound(repo, inputs.values())
if client.has_external_storage:
client.track_paths_in_storage(*paths)
tool.inputs = list(inputs.values())
tool.outputs = outputs
# Requirement detection can be done anytime.
from .process_requirements import InitialWorkDirRequirement, \
InlineJavascriptRequirement
initial_work_dir_requirement = InitialWorkDirRequirement.from_tool(
tool,
existing_directories=existing_directories,
siblings you should check the lenght is greater than 1.
"""
parent = None
if isinstance(node, Entity):
parent_siblings = self.siblings(node.parent) - {node.parent}
return set(node.parent.members) | parent_siblings
elif isinstance(node, Generation):
parent = node.activity
elif isinstance(node, Usage):
parent = self.activities[node.commit]
elif isinstance(node, Process):
return {node}
if parent is None or not isinstance(parent, ProcessRun):
raise errors.InvalidOutputPath(
'The file "{0}" was not created by a renku command. \n\n'
'Check the file history using: git log --follow "{0}"'.format(
node.path
)
)
return set(parent.generated)
format(repo_path)
)
repo = clone(url, path=str(repo_path), install_githooks=False)
# Because the name of the default branch is not always 'master', we
# create an alias of the default branch when cloning the repo. It
# is used to refer to the default branch later.
renku_ref = 'refs/heads/' + RENKU_BRANCH
try:
repo.git.execute([
'git', 'symbolic-ref', renku_ref, repo.head.reference.path
])
checkout(repo, ref)
except GitCommandError as e:
raise errors.GitError(
'Cannot clone remote Git repo: {}'.format(url)
) from e
else:
return repo, repo_path
def ensure_clean(self, ignore_std_streams=False):
"""Make sure the repository is clean."""
dirty_paths = self.dirty_paths
mapped_streams = _mapped_std_streams(dirty_paths)
if ignore_std_streams:
if dirty_paths - set(mapped_streams.values()):
_clean_streams(self.repo, mapped_streams)
raise errors.DirtyRepository(self.repo)
elif self.repo.is_dirty(untracked_files=True):
_clean_streams(self.repo, mapped_streams)
raise errors.DirtyRepository(self.repo)