Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@testcase.attr('positive')
def test_paging_next_option_start_in_middle(self):
"""Covers getting a list of resources and using the next reference."""
number_of_resources = 150
test_model = self.create_model()
filter = self._set_filter_field(test_model)
self.create_resources(count=number_of_resources, model=test_model)
# First set of resources
limit = number_of_resources // 10
offset = number_of_resources // 2
resp, resources, next_ref, prev_ref = self.get_resources(
limit=limit, offset=offset, filter=filter)
self.assertEqual(200, resp.status_code)
config_path = os.path.join(
xdg.BaseDirectory.save_config_path("snapcraft"), "cli.cfg"
)
with open(config_path, "w") as f:
f.write("[Lifecycle]\noutdated_step_action = clean")
self.copy_project_to_cwd("cmake-with-lib")
output = self.run_snapcraft("prime")
# Assert that cmake actually configured and built from scratch
self.assertThat(output, Contains("The CXX compiler identification"))
self.assertThat(
output, Contains("Building CXX object CMakeFiles/foo.dir/foo.cpp.o")
)
self.assertThat(
output, Contains("Building CXX object CMakeFiles/usefoo.dir/main.cpp.o")
)
binary_output = self.get_output_ignoring_non_zero_exit(
os.path.join(self.prime_dir, "bin", "usefoo")
)
self.assertThat(binary_output, Equals("foo\n"))
# Modify the source code
source_file_path = os.path.join("src", "foo.cpp")
shutil.copy("new_foo.cpp", source_file_path)
# Prime again. This should rebuild
output = self.run_snapcraft("prime")
# Assert that cmake did not start from scratch, and reused everything
# it could
self.copy_project_to_cwd('hg-branch')
self.init_source_control()
subprocess.check_call(
['hg', 'branch', 'second'], stdout=subprocess.DEVNULL)
open('second', 'w').close()
self.commit('second', 'second')
subprocess.check_call(
['hg', 'branch', 'default'], stdout=subprocess.DEVNULL)
open('default', 'w').close()
self.commit('default', 'default')
self.run_snapcraft('pull')
self.assertThat(
os.path.join(self.parts_dir, 'mercurial', 'src', 'second'),
FileExists())
self.run_snapcraft('pull')
self.assertThat(
os.path.join(self.parts_dir, 'mercurial', 'src', 'second'),
FileExists())
def test_reset(self):
repo = Repo(self._dir.path)
repo.add("foo")
repo.commit()
path = os.path.join(self._dir.path, "foo")
self.assertThat(path, FileExists())
self._dir.unlink("foo")
self.assertThat(path, Not(FileExists()))
repo.reset()
self.assertThat(path, FileExists())
binary_output = self.get_output_ignoring_non_zero_exit(
os.path.join(self.prime_dir, "bin", "usefoo")
)
self.assertThat(binary_output, Equals("foo\n"))
# Modify the source code
source_file_path = os.path.join("src", "foo.cpp")
shutil.copy("new_foo.cpp", source_file_path)
# Prime again. This should rebuild
output = self.run_snapcraft("prime")
# Assert that cmake did not start from scratch, and reused everything
# it could
self.assertThat(output, Not(Contains("The CXX compiler identification")))
self.assertThat(
output, Contains("Building CXX object CMakeFiles/foo.dir/foo.cpp.o")
)
self.assertThat(
output,
Not(Contains("Building CXX object CMakeFiles/usefoo.dir/main.cpp.o")),
)
binary_output = self.get_output_ignoring_non_zero_exit(
os.path.join(self.prime_dir, "bin", "usefoo")
)
self.assertThat(binary_output, Equals("new foo\n"))
@testtools.skipIf(
not CONF.object_storage_feature_enabled.container_sync,
'Old-style container sync function is disabled')
def test_container_synchronization(self):
def make_headers(cont, cont_client):
# tell first container to synchronize to a second
client_proxy_ip = \
urlparse.urlparse(cont_client.base_url).netloc.split(':')[0]
client_base_url = \
cont_client.base_url.replace(client_proxy_ip,
self.local_ip)
headers = {'X-Container-Sync-Key': 'sync_key',
'X-Container-Sync-To': "%s/%s" %
(client_base_url, str(cont))}
return headers
self._test_container_synchronization(make_headers)
def _check_output_presence(self, spec):
outspec = spec.get('outputs', {})
unmatched_output = []
for ospec_id in outspec:
ospec = outspec[ospec_id]
ospectype = ospec['type']
if ospectype == 'file':
self.assertThat(
ospec['value'],
Annotate('expected output file missing', FileExists()))
elif ospectype == 'directory':
self.assertThat(
ospec['value'],
Annotate('expected output directory missing', DirExists()))
elif ospectype == 'string' and ospec_id.startswith('tests'):
execinfo = self._details['exec_info']
sec, idx, field = ospec_id.split('::')
for f, matcher in __spec_matchers__.iteritems():
if f in ospec:
# allow for multiple target values (given a matcher) being
# specified. For some matchers it might make no sense
# (e.g. "endswith")
targets = ospec[f]
for target in (targets if isinstance(targets, list) else [targets]):
target = unicode.replace(target, "", os.linesep)
# TODO: This replacement may be should be done elsewhere
# to have a general solution. It's now affecting string-type only.
# Additionally, "" may appear in some output intentionally,
# so let's find sth closer to be 'unique'.
self.assertThat(
@testcase.attr('positive')
def test_acl_get(self):
secret_ref = self.secret_behaviors.store_secret()
container_ref = self.container_behaviors.create_container(
secret_hrefs=[secret_ref])
data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
users=['u1', 'u2'])
self.assertIsNotNone(data)
data = self.acl_behaviors.acl_get(entity_ref=secret_ref)
self.assertIn('u2', data['Users'])
self.assertEqual('True', data['Project Access'])
self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref'])
data = self.acl_behaviors.acl_get(entity_ref=secret_ref + "///")
test_to_worker = {}
def map_test(test_dict):
tags = test_dict['tags']
id = test_dict['id']
workers = []
for tag in tags:
if tag.startswith('worker-'):
workers.append(tag)
if not workers:
workers = [None]
for worker in workers:
worker_to_test.setdefault(worker, []).append(id)
test_to_worker.setdefault(id, []).extend(workers)
mapper = testtools.StreamToDict(map_test)
mapper.startTestRun()
try:
case.run(mapper)
finally:
mapper.stopTestRun()
self._worker_to_test = worker_to_test
self._test_to_worker = test_to_worker
failing_workers = self._test_to_worker[failing_id]
prior_tests = []
for worker in failing_workers:
worker_tests = self._worker_to_test[worker]
prior_tests.extend(worker_tests[:worker_tests.index(failing_id)])
return prior_tests
def _find_failing(repo):
run = repo.get_failing()
case = run.get_test()
ids = []
def gather_errors(test_dict):
if test_dict['status'] == 'fail':
ids.append(test_dict['id'])
result = testtools.StreamToDict(gather_errors)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
return ids