Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@type arch: string
@param arch: packages built for this architecture, but also including
architecture independent (noarch) packages
'''
info = self.get_pkg_info(pkg)
rpms = self.get_pkg_rpm_info(pkg, arch)
rpm_urls = []
if self.config_options.has_key('pkgurl'):
base_url = self.config_options['pkgurl']
else:
base_url = "%s/%s" % (self.config_options['topurl'],
'packages')
for rpm in rpms:
rpm_name = koji.pathinfo.rpm(rpm)
url = ("%s/%s/%s/%s/%s" % (base_url,
info['package_name'],
info['version'], info['release'],
rpm_name))
rpm_urls.append(url)
return rpm_urls
def set_up_files(self, name):
datadir = os.path.join(os.path.dirname(__file__), 'data/image', name)
# load image result data for our test build
data = json.load(open(datadir + '/data.json'))
self.db_expect = json.load(open(datadir + '/db.json'))
for arch in data:
taskdir = koji.pathinfo.task(data[arch]['task_id'])
os.makedirs(taskdir)
filenames = data[arch]['files'] + data[arch]['logs']
for filename in filenames:
path = os.path.join(taskdir, filename)
with open(path, 'w') as fp:
fp.write('Test file for %s\n%s\n' % (arch, filename))
self.image_data = data
options='options',
workdir='workdir',
demux=orchestrator)
if orchestrator:
get_logs_fname = 'get_orchestrator_build_logs'
log_entries = [LogEntry(None, 'line 1'),
LogEntry(None, 'line 2'),
LogEntry('x86_64', 'line 1'),
LogEntry('x86_64', 'line 2'),
LogEntry(builder_containerbuild.METADATA_TAG, 'x.log')]
koji_tmpdir = tmpdir.mkdir('koji')
koji_tmpdir.join('x.log').write('line 1\n'
'line 2\n')
(flexmock(koji.pathinfo)
.should_receive('work')
.and_return(str(koji_tmpdir)))
else:
get_logs_fname = 'get_build_logs'
log_entries = ['line 1',
'line 2']
build_response = flexmock(status=42)
(build_response
.should_receive('is_running')
.and_return(build_not_finished))
(build_response
.should_receive('is_pending')
.and_return(build_not_finished))
cct._osbs = flexmock()
# generate pkglist file and sigmap
self.files.append('pkglist')
plist = os.path.join(uploaddir, 'pkglist')
nvrs = ['aaa-1.0-2', 'bbb-3.0-5', 'ccc-8.0-13','ddd-21.0-34']
self.sigmap = []
self.rpms = {}
self.builds ={}
self.key = '4c8da725'
with open(plist, 'w') as f_pkglist:
for nvr in nvrs:
binfo = koji.parse_NVR(nvr)
rpminfo = binfo.copy()
rpminfo['arch'] = 'x86_64'
builddir = koji.pathinfo.build(binfo)
relpath = koji.pathinfo.signed(rpminfo, self.key)
path = os.path.join(builddir, relpath)
koji.ensuredir(os.path.dirname(path))
basename = os.path.basename(path)
with open(path, 'w') as fo:
fo.write('%s' % basename)
f_pkglist.write(path)
f_pkglist.write('\n')
self.expected.append('x86_64/%s/%s' % (basename[0], basename))
build_id = len(self.builds) + 10000
rpm_id = len(self.rpms) + 20000
binfo['id'] = build_id
rpminfo['build_id'] = build_id
rpminfo['id'] = rpm_id
self.builds[build_id] = binfo
self.rpms[rpm_id] = rpminfo
self.sigmap.append([rpm_id, self.key])
fo.write('%s' % fn)
# generate pkglist file and sigmap
self.files.append('pkglist')
plist = os.path.join(uploaddir, 'pkglist')
nvrs = ['aaa-1.0-2', 'bbb-3.0-5', 'ccc-8.0-13','ddd-21.0-34']
self.sigmap = []
self.rpms = {}
self.builds ={}
self.key = '4c8da725'
with open(plist, 'w') as f_pkglist:
for nvr in nvrs:
binfo = koji.parse_NVR(nvr)
rpminfo = binfo.copy()
rpminfo['arch'] = 'x86_64'
builddir = koji.pathinfo.build(binfo)
relpath = koji.pathinfo.signed(rpminfo, self.key)
path = os.path.join(builddir, relpath)
koji.ensuredir(os.path.dirname(path))
basename = os.path.basename(path)
with open(path, 'w') as fo:
fo.write('%s' % basename)
f_pkglist.write(path)
f_pkglist.write('\n')
self.expected.append('x86_64/%s/%s' % (basename[0], basename))
build_id = len(self.builds) + 10000
rpm_id = len(self.rpms) + 20000
binfo['id'] = build_id
rpminfo['build_id'] = build_id
rpminfo['id'] = rpm_id
self.builds[build_id] = binfo
self.rpms[rpm_id] = rpminfo
def get_pkg_rpm_file_names(self, pkg, arch=None):
'''
Gets the file names for the RPM packages specified in pkg
@type pkg: KojiPkgSpec
@param pkg: a package specification
@type arch: string
@param arch: packages built for this architecture, but also including
architecture independent (noarch) packages
'''
if arch is None:
arch = utils.get_arch()
rpm_names = []
rpms = self.get_pkg_rpm_info(pkg, arch)
for rpm in rpms:
arch_rpm_name = koji.pathinfo.rpm(rpm)
rpm_name = os.path.basename(arch_rpm_name)
rpm_names.append(rpm_name)
return rpm_names
opts[name] = default
if opts['DBHost'] is None:
opts['DBHost'] = opts['DBhost']
# load policies
# (only from config file)
if config and config.has_section('policy'):
#for the moment, we simply transfer the policy conf to opts
opts['policy'] = dict(config.items('policy'))
else:
opts['policy'] = {}
for pname, text in six.iteritems(_default_policies):
opts['policy'].setdefault(pname, text)
# use configured KojiDir
if opts.get('KojiDir') is not None:
koji.BASEDIR = opts['KojiDir']
koji.pathinfo.topdir = opts['KojiDir']
return opts
def linked_upload(localfile, path, name=None):
"""Link a file into the (locally writable) workdir, bypassing upload"""
old_umask = os.umask(0o02)
try:
if name is None:
name = os.path.basename(localfile)
dest_dir = os.path.join(koji.pathinfo.work(), path)
dst = os.path.join(dest_dir, name)
koji.ensuredir(dest_dir)
# fix uid/gid to keep httpd happy
st = os.stat(koji.pathinfo.work())
os.chown(dest_dir, st.st_uid, st.st_gid)
print("Linking rpm to: %s" % dst)
os.link(localfile, dst)
finally:
os.umask(old_umask)