Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def tag_image(manifest_pk, tag, repository_pk):
"""
Create a new repository version out of the passed tag name and the manifest.
If the tag name is already associated with an existing manifest with the same digest,
no new content is created. Note that a same tag name cannot be used for two different
manifests. Due to this fact, an old Tag object is going to be removed from
a new repository version when a manifest contains a digest which is not equal to the
digest passed with POST request.
"""
manifest = Manifest.objects.get(pk=manifest_pk)
artifact = manifest._artifacts.all()[0]
repository = Repository.objects.get(pk=repository_pk)
latest_version = RepositoryVersion.latest(repository)
tags_to_remove = Tag.objects.filter(
pk__in=latest_version.content.all(),
name=tag
).exclude(
tagged_manifest=manifest
)
manifest_tag, created = Tag.objects.get_or_create(
name=tag,
tagged_manifest=manifest
)
if created:
resource = CreatedResource(content_object=manifest_tag)
def synchronize(remote_pk, repository_pk):
"""
Sync content from the remote repository.
Create a new version of the repository that is synchronized with the remote.
Args:
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
Raises:
ValueError: If the remote does not specify a URL to sync
"""
remote = DockerRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
if not remote.url:
raise ValueError(_('A remote must have a url specified to synchronize.'))
first_stage = DockerFirstStage(remote)
DeclarativeVersion(first_stage, repository).create()
except ImporterError as exc:
log.info(f"Collection processing was not successfull: {exc}")
raise
except Exception as exc:
user_facing_logger.error(f"Collection processing was not successfull: {exc}")
raise
ContentArtifact.objects.create(
artifact=artifact,
content=collection_version,
relative_path=collection_version.relative_path,
)
CreatedResource.objects.create(content_object=collection_version)
if repository_pk:
repository = Repository.objects.get(pk=repository_pk)
content_q = CollectionVersion.objects.filter(pk=collection_version.pk)
with repository.new_version() as new_version:
new_version.add_content(content_q)
CreatedResource.objects.create(content_object=repository)
def synchronize(remote_pk, repository_pk):
"""
Sync content from the remote repository.
Create a new version of the repository that is synchronized with the remote.
Args:
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
Raises:
ValueError: If the remote does not specify a URL to sync
"""
remote = DockerRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
if not remote.url:
raise ValueError(_('A remote must have a url specified to synchronize.'))
remove_duplicate_tags = [{'model': Tag, 'field_names': ['name']}]
log.info(_('Synchronizing: repository={r} remote={p}').format(
r=repository.name, p=remote.name))
first_stage = DockerFirstStage(remote)
dv = DockerDeclarativeVersion(first_stage, repository, remove_duplicates=remove_duplicate_tags)
dv.create()
"""
Sync content from the remote repository.
Create a new version of the repository that is synchronized with the remote.
Args:
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
mirror (bool): True for mirror mode, False for additive.
Raises:
ValueError: If the remote does not specify a URL to sync
"""
remote = AptRemote.objects.get(pk=remote_pk)
repository = Repository.objects.get(pk=repository_pk)
if not remote.url:
raise ValueError(_("A remote must have a url specified to synchronize."))
first_stage = DebFirstStage(remote)
DebDeclarativeVersion(first_stage, repository, mirror=mirror).create()
def copy_content(source_repo_version_pk, dest_repo_pk, types):
"""
Copy content from one repo to another.
Args:
source_repo_version_pk: repository version primary key to copy units from
dest_repo_pk: repository primary key to copy units into
types: a tuple of strings representing the '_type' values of types to include in the copy
"""
source_repo_version = RepositoryVersion.objects.get(pk=source_repo_version_pk)
dest_repo = Repository.objects.get(pk=dest_repo_pk)
query = None
for ptype in types:
if query:
query = query | Q(_type=ptype)
else:
query = Q(_type=ptype)
content_to_copy = source_repo_version.content.filter(query)
with RepositoryVersion.create(dest_repo) as new_version:
new_version.add_content(content_to_copy)
def one_shot_upload(artifact_pk, repository_pk=None):
"""
Create a Package from an uploaded file and attach it to a Repository if provided.
Args:
artifact_pk (str): Create a Package from this artifact.
Optional:
repository_pk (str): Insert the Package into this Repository.
"""
artifact = Artifact.objects.get(pk=artifact_pk)
repository = Repository.objects.get(pk=repository_pk) if repository_pk else None
log.debug(
_("Uploading deb package: artifact={artifact}, repository={repo}").format(
artifact=artifact, repo=repository
)
)
package_paragraph = debfile.DebFile(fileobj=artifact.file).debcontrol()
package_dict = Package.from822(package_paragraph)
package_dict["sha256"] = artifact.sha256
package = Package(**package_dict)
package.relative_path = package.filename()
try:
package = Package.objects.get(sha256=artifact.sha256, relative_path=package.relative_path)
except ObjectDoesNotExist:
with transaction.atomic():
package.save()
def recursive_add_content(repository_pk, content_units):
"""
Create a new repository version by recursively adding content.
For each unit that is specified, we also need to add related content. For example, if a
manifest-list is specified, we need to add all referenced manifests, and all blobs referenced
by those manifests.
Args:
repository_pk (int): The primary key for a Repository for which a new Repository Version
should be created.
content_units (list): List of PKs for :class:`~pulpcore.app.models.Content` that
should be added to the previous Repository Version for this Repository.
"""
repository = Repository.objects.get(pk=repository_pk)
tags_to_add = Tag.objects.filter(
pk__in=content_units,
)
manifest_lists_to_add = Manifest.objects.filter(
pk__in=content_units,
media_type=MEDIA_TYPE.MANIFEST_LIST
) | Manifest.objects.filter(
pk__in=tags_to_add.values_list('tagged_manifest', flat=True),
media_type=MEDIA_TYPE.MANIFEST_LIST,
)
manifests_to_add = Manifest.objects.filter(
pk__in=content_units,
media_type__in=[MEDIA_TYPE.MANIFEST_V1, MEDIA_TYPE.MANIFEST_V1_SIGNED,