Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@pytest.mark.parametrize('algorithm', SUPPORTED_HASH_ALGORITHMS)
def test_cekit_supported_algorithms(tmpdir, algorithm):
"""
This is a bit counter intuitive, but we try to cache an artifact
with all known supported hash algorithms. In case when the algorithm
would not be supported, it would result in return code 2 and a different error message
"""
work_dir = str(tmpdir.mkdir('work_dir'))
artifact = os.path.join(work_dir, 'artifact')
open(artifact, 'a').close()
result = run_cekit_cache(
[
'-v',
'--work-dir',
'--work-dir',
work_dir,
'add',
artifact,
'--md5',
'd41d8cd98f00b204e9800998ecf8427e'])
assert "cached with UUID" in result.output
# Artifact added to cache should have all supported checksums computed
result = run_cekit_cache(['-v',
'--work-dir',
work_dir,
'ls'])
for alg in SUPPORTED_HASH_ALGORITHMS:
assert alg in result.output
assert "sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" in result.output
assert "sha1: da39a3ee5e6b4b0d3255bfef95601890afd80709" in result.output
assert "md5: d41d8cd98f00b204e9800998ecf8427e" in result.output
def ls(self):
artifact_cache = ArtifactCache()
artifacts = artifact_cache.list()
if artifacts:
for artifact_filename, artifact in artifacts.items():
click.echo("\n{}:".format(click.style(
artifact_filename.split('.')[0], fg='green', bold=True)))
for alg in SUPPORTED_HASH_ALGORITHMS:
if alg in artifact and artifact[alg]:
click.echo(" {}: {}".format(click.style(alg, bold=True), artifact[alg]))
if artifact['names']:
click.echo(" {}:".format(click.style("names", bold=True)))
for name in artifact['names']:
click.echo(" - %s" % name)
else:
click.echo('No artifacts cached!')
def __verify(self, target):
""" Checks all defined check_sums for an aritfact """
if not set(SUPPORTED_HASH_ALGORITHMS).intersection(self):
logger.debug("Artifact '{}' lacks any checksum definition.".format(self.name))
return False
if not Resource.CHECK_INTEGRITY:
logger.info("Integrity checking disabled, skipping verification.")
return True
if os.path.isdir(target):
logger.info("Target is directory, cannot verify checksum.")
return True
for algorithm in SUPPORTED_HASH_ALGORITHMS:
if algorithm in self and self[algorithm]:
if not check_sum(target, algorithm, self[algorithm], self['name']):
return False
return True
def __verify(self, target):
""" Checks all defined check_sums for an aritfact """
if not set(SUPPORTED_HASH_ALGORITHMS).intersection(self):
logger.debug("Artifact '{}' lacks any checksum definition.".format(self.name))
return False
if not Resource.CHECK_INTEGRITY:
logger.info("Integrity checking disabled, skipping verification.")
return True
if os.path.isdir(target):
logger.info("Target is directory, cannot verify checksum.")
return True
for algorithm in SUPPORTED_HASH_ALGORITHMS:
if algorithm in self and self[algorithm]:
if not check_sum(target, algorithm, self[algorithm], self['name']):
return False
return True
def get(self, artifact):
for alg in SUPPORTED_HASH_ALGORITHMS:
if alg in artifact:
return self._find_artifact(alg, artifact[alg])
raise CekitError('Artifact is not cached.')
def add(self, artifact):
if not set(SUPPORTED_HASH_ALGORITHMS).intersection(artifact):
raise ValueError('Cannot cache artifact without checksum')
if self.cached(artifact):
raise CekitError('Artifact is already cached!')
artifact_id = str(uuid.uuid4())
artifact_file = os.path.expanduser(os.path.join(self.cache_dir, artifact_id))
if not os.path.exists(artifact_file):
artifact.guarded_copy(artifact_file)
cache_entry = {'names': [artifact['name']],
'cached_path': artifact_file}
# We should populate the cache entry with checksums for all supported algorithms
for alg in SUPPORTED_HASH_ALGORITHMS:
def guarded_copy(self, target):
try:
self._copy_impl(target)
except Exception as ex:
logger.warning("Cekit is not able to fetch resource '{}' automatically. "
"Please use cekit-cache command to add this artifact manually.".format(self.name))
if self.description:
logger.info(self.description)
# exception is fatal we be logged before Cekit dies
raise CekitError("Error copying resource: '%s'. See logs for more info."
% self.name, ex)
if set(SUPPORTED_HASH_ALGORITHMS).intersection(self) and \
not self.__verify(target):
raise CekitError('Artifact checksum verification failed!')
return target