Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ID_LIKE="fedora"
VERSION_ID="8.0"
PLATFORM_ID="platform:el8"
PRETTY_NAME="Red Hat Enterprise Linux 8.0 (Ootpa)"
ANSI_COLOR="0;31"
CPE_NAME="cpe:/o:redhat:enterprise_linux:8.0:GA"
HOME_URL="https://www.redhat.com/"
BUG_REPORT_URL="https://bugzilla.redhat.com/"
REDHAT_BUGZILLA_PRODUCT="Red Hat Enterprise Linux 8"
REDHAT_BUGZILLA_PRODUCT_VERSION=8.0
REDHAT_SUPPORT_PRODUCT="Red Hat Enterprise Linux"
REDHAT_SUPPORT_PRODUCT_VERSION="8.0"'''
class MockedDescriptor(Descriptor):
def __init__(self, descriptor):
self.schema = yaml.safe_load("""type: any""")
super(MockedDescriptor, self).__init__(descriptor)
for key, val in descriptor.items():
if isinstance(val, dict):
self._descriptor[key] = MockedDescriptor(val)
def test_merging_description_image():
desc1 = Image({'name': 'foo', 'version': 1}, None)
desc2 = Module({'name': 'mod1', 'version': 2,
'description': 'mod_desc'}, None, None)
merged = _merge_descriptors(desc1, desc2)
id: {type: str}
present: {type: bool}
url:
map:
repository: {type: str}
gpg: {type: str}
rpm: {type: str}
description: {type: str}
odcs:
map:
pulp: {type: str}
filename: {type: str}
""")
class Packages(Descriptor):
"""
Object representing packages
Args:
descriptor - yaml containing Packages section
"""
def __init__(self, descriptor, descriptor_path):
self.schema = packages_schema
self.descriptor_path = descriptor_path
super(Packages, self).__init__(descriptor)
# If 'content_sets' and 'content_sets_file' are defined at the same time
if set(['content_sets', 'content_sets_file']).issubset(set(descriptor.keys())):
raise CekitError(
"You cannot specify 'content_sets' and 'content_sets_file' together in the packages section!")
import yaml
from cekit.descriptor import Descriptor
env_schema = yaml.safe_load("""
map:
name: {type: str, required: True}
value: {type: any}
example: {type: any}
description: {type: str}""")
class Env(Descriptor):
"""Object representing Env variable
Args:
descriptor - yaml object containing Env variable
"""
def __init__(self, descriptor):
self.schema = env_schema
super(Env, self).__init__(descriptor)
@property
def name(self):
return self.get('name')
@name.setter
def name(self, value):
self._descriptor['name'] = value
from cekit.descriptor import Descriptor
run_schema = yaml.safe_load("""
map:
workdir: {type: str}
user: {type: text}
cmd:
seq:
- {type: str}
entrypoint:
seq:
- {type: str} """)
class Run(Descriptor):
"""Object representing Run configuration
If 'name' is not present 'run' string is used.
Args:
descriptor - a yaml containing descriptor object
"""
def __init__(self, descriptor):
self.schema = run_schema
super(Run, self).__init__(descriptor)
if 'name' not in self._descriptor:
self._descriptor['name'] = 'run'
self.skip_merging = ['cmd', 'entrypoint']
def merge(self, descriptor):
if not descriptor:
return self
from cekit.descriptor import Descriptor
execute_schemas = yaml.safe_load("""
map:
name: {type: str}
script: {type: str}
user: {type: text}""")
container_schemas = yaml.safe_load("""
seq:
- {type: any}""")
logger = logging.getLogger('cekit')
class Execute(Descriptor):
def __init__(self, descriptor, module_name):
self.schema = execute_schemas
super(Execute, self).__init__(descriptor)
descriptor['directory'] = module_name
descriptor['module_name'] = module_name
if 'name' not in descriptor:
# Generated name
descriptor['name'] = "{}/{}".format(module_name, descriptor['script'])
logger.debug("No value found for 'name' key in the execute section of the '{}' module; using auto-generated value: '{}'".format(
module_name, descriptor['name']))
@property
def name(self):
map:
name: {type: str}
branch: {type: str}
configuration: {type: any}
extra_dir: {type: str}
""")
configuration_schema = yaml.safe_load("""
map:
container: {type: any}
container_file: {type: str}
""")
class Osbs(Descriptor):
"""
Object Representing OSBS configuration
Args:
descriptor: dictionary object containing OSBS configuration
descriptor_path: path to descriptor file
"""
def __init__(self, descriptor, descriptor_path):
self.schema = osbs_schema
self.descriptor_path = descriptor_path
super(Osbs, self).__init__(descriptor)
if 'configuration' in self:
self['configuration'] = Configuration(self['configuration'], self.descriptor_path)
return _PathResource(descriptor, directory)
if 'url' in descriptor:
return _UrlResource(descriptor)
if 'git' in descriptor:
return _GitResource(descriptor)
if 'md5' in descriptor:
return _PlainResource(descriptor)
raise CekitError("Resource '{}' is not supported".format(descriptor))
class Resource(Descriptor):
"""
Base class for handling resources.
In most cases resources are synonym to artifacts.
"""
CHECK_INTEGRITY = True
def __init__(self, descriptor):
# Schema must be provided by the implementing class
if not self.schema:
raise CekitError("Resource '{}' has no schema defined".format(type(self).__name__))
# Includes validation
super(Resource, self).__init__(descriptor)
super(Modules, self).__init__(descriptor)
self._descriptor['repositories'] = [Resource(r, directory=path)
for r in self._descriptor.get('repositories', [])]
self._descriptor['install'] = [Install(x) for x in self._descriptor.get('install', [])]
@property
def repositories(self):
return self.get('repositories')
@property
def install(self):
return self.get('install')
class Install(Descriptor):
def __init__(self, descriptor):
self.schemas = install_schema
super(Install, self).__init__(descriptor)
@property
def name(self):
return self.get('name')
@name.setter
def name(self, value):
self._descriptor['name'] = value
@property
def version(self):
return self.get('version')
import logging
import os
import yaml
from cekit.descriptor import Descriptor
logger = logging.getLogger('cekit')
volume_schema = yaml.safe_load("""
map:
name: {type: str}
path: {type: str, required: True}""")
class Volume(Descriptor):
"""Object representing Volume.
If 'name' is not present its generated as basename of 'path'
Args:
descriptor - yaml file containing volume object
"""
def __init__(self, descriptor):
self.schema = volume_schema
super(Volume, self).__init__(descriptor)
if 'name' not in self._descriptor:
logger.warning("No value found for 'name' in 'volume'; using auto-generated value of '{}'".
format(os.path.basename(self._descriptor['path'])))
self._descriptor['name'] = os.path.basename(self._descriptor['path'])
@content_sets.setter
def content_sets(self, value):
self._descriptor['content_sets'] = value
self._descriptor.pop('content_sets_file', None)
@property
def content_sets_file(self):
return self.get('content_sets_file')
@content_sets_file.setter
def content_sets_file(self, value):
self._descriptor['content_sets_file'] = value
self._descriptor.pop('content_sets', None)
class Repository(Descriptor):
"""Object representing package repository
Args:
descriptor - repository name as referenced in cekit config file
"""
def __init__(self, descriptor):
self.schema = repository_schema
super(Repository, self).__init__(descriptor)
if not (('url' in descriptor) ^
('odcs' in descriptor) ^
('id' in descriptor) ^
('rpm' in descriptor)):
raise CekitError("Repository '%s' is invalid, you can use only one of "
"['id', 'odcs', 'rpm', 'url']"