Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_brew_url_when_build_was_removed(mocker):
mocker.patch('subprocess.check_output', side_effect=brew_call_removed)
with pytest.raises(CekitError) as excinfo:
tools.get_brew_url('aa')
assert 'Artifact with checksum aa was found in Koji metadata but the build is in incorrect state (DELETED) making the artifact not available for downloading anymore' in str(
excinfo.value)
def test_get_brew_url(mocker):
mocker.patch('subprocess.check_output', side_effect=brew_call_ok)
url = tools.get_brew_url('aa')
assert url == "http://download.devel.redhat.com/brewroot/packages/net.oauth.core-oauth/20100527/1/maven/net/oauth/core/oauth/20100527/oauth-20100527.jar"
def mocked_dependency_handler(mocker, data="ID=fedora\nNAME=somefedora\nVERSION=123"):
dh = None
with mocker.mock_module.patch('cekit.tools.os.path.exists') as exists_mock:
exists_mock.return_value = True
with mocker.mock_module.patch('cekit.tools.open', mocker.mock_open(read_data=data)):
dh = tools.DependencyHandler()
try:
yield dh
finally:
pass
def load_repository(self, repo_dir):
for modules_dir, _, files in os.walk(repo_dir):
if 'module.yaml' in files:
module_descriptor_path = os.path.abspath(os.path.expanduser(
os.path.normcase(os.path.join(modules_dir, 'module.yaml'))))
module = Module(tools.load_descriptor(module_descriptor_path),
modules_dir,
os.path.dirname(module_descriptor_path))
LOGGER.debug("Adding module '{}', path: '{}'".format(module.name, module.path))
self._module_registry.add_module(module)
def check_module_version(path, version):
descriptor = Module(tools.load_descriptor(os.path.join(path, 'module.yaml')),
path,
os.path.dirname(os.path.abspath(os.path.join(path, 'module.yaml'))))
if hasattr(descriptor, 'version') and descriptor.version != version:
raise CekitError("Requested conflicting version '%s' of module '%s'" %
(version, descriptor['name']))
modules = descriptor.get('modules')
if not modules.get('repositories'):
modules['repositories'] = [{'path': local_mod_path, 'name': 'modules'}]
self.image = Image(descriptor, os.path.dirname(os.path.abspath(str(descriptor_path))))
self._overrides = []
self.target = target
self._params = params
self._fetch_repos = False
self._module_registry = ModuleRegistry()
if overrides:
for override in overrides:
# TODO: If the overrides is provided as text, why do we try to get path to it?
logger.debug("Loading override '%s'" % (override))
self._overrides.append(Overrides(tools.load_descriptor(
override), os.path.dirname(os.path.abspath(override))))
# These should always come last
if self._params.get('tech_preview', False):
# Modify the image name, after all other overrides have been processed
self._overrides.append(self.get_tech_preview_overrides())
if self._params.get('redhat', False):
# Add the redhat specific stuff after everything else
self._overrides.append(self.get_redhat_overrides())
logger.info("Initializing image descriptor...")
def __init__(self, descriptor_path, target, overrides):
self._descriptor_path = descriptor_path
self._overrides = []
self.target = target
self._fetch_repos = False
self._module_registry = ModuleRegistry()
self.image = None
self.builder_images = []
if overrides:
for override in overrides:
# TODO: If the overrides is provided as text, why do we try to get path to it?
LOGGER.debug("Loading override '{}'".format(override))
self._overrides.append(Overrides(tools.load_descriptor(
override), os.path.dirname(os.path.abspath(override))))
LOGGER.info("Initializing image descriptor...")
def __init__(self, descriptor_path, target, builder, overrides, params):
self._type = builder
descriptor = tools.load_descriptor(descriptor_path)
# if there is a local modules directory and no modules are defined
# we will inject it for a backward compatibility
local_mod_path = os.path.join(os.path.abspath(os.path.dirname(descriptor_path)), 'modules')
if os.path.exists(local_mod_path) and 'modules' in descriptor:
modules = descriptor.get('modules')
if not modules.get('repositories'):
modules['repositories'] = [{'path': local_mod_path, 'name': 'modules'}]
self.image = Image(descriptor, os.path.dirname(os.path.abspath(str(descriptor_path))))
self._overrides = []
self.target = target
self._params = params
self._fetch_repos = False
self._module_registry = ModuleRegistry()
# Default to computed target based on branch
target = "{}-containers-candidate".format(self.dist_git.branch)
scratch = True
if self.params.release:
scratch = False
kwargs = "{{'src': '{}', 'target': '{}', 'opts': {{'scratch': {}, 'git_branch': '{}', 'yum_repourls': {}}}}}".format(
src, target, scratch, self.dist_git.branch, self._rhpkg_set_url_repos)
cmd.append(kwargs)
LOGGER.info("About to execute '{}'.".format(' '.join(cmd)))
if self.params.assume_yes or tools.decision("Do you want to build the image in OSBS?"):
build_type = "scratch" if scratch else "release"
LOGGER.info("Executing {} container build in OSBS...".format(build_type))
try:
task_id = subprocess.check_output(cmd).strip().decode("utf8")
except subprocess.CalledProcessError as ex:
raise CekitError("Building container image in OSBS failed", ex)
LOGGER.info("Task {0} was submitted, you can watch the progress here: {1}/taskinfo?taskID={0}".format(
task_id, self._koji_url))
if self.params.nowait:
return
self._wait_for_osbs_task(task_id)