Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ddt
from eom import bastion
from tests import util
bastion.configure(util.CONF)
@ddt.ddt
class TestBastion(util.TestCase):
def setUp(self):
super(TestBastion, self).setUp()
self.app = util.app
self.wrapped_app = util.wrap_403(self.app)
self.bastion = bastion.wrap(self.app, self.wrapped_app)
self.unrestricted_route = '/v1/health'
self.normal_route = '/v1'
def tearDown(self):
super(TestBastion, self).tearDown()
bastion._CONF.clear_override('gate_headers', bastion.OPT_GROUP_NAME)
import ddt
import mock
import six
import six.moves.builtins as __builtin__
from rally import exceptions
from rally.plugins.common.exporter import file_system
from tests.unit import test
if six.PY3:
import io
file = io.BytesIO
@ddt.ddt
class FileExporterTestCase(test.TestCase):
@mock.patch("rally.plugins.common.exporter.file_system.os.path.exists")
@mock.patch.object(__builtin__, "open", autospec=True)
@mock.patch("rally.plugins.common.exporter.file_system.json.dumps")
@mock.patch("rally.api.API")
def test_file_exporter_export(self, mock_api, mock_dumps,
mock_open, mock_exists):
rapi = mock_api.return_value
mock_exists.return_value = True
rapi.task.get_detailed.return_value = {"results": [{
"key": "fake_key",
"data": {
"raw": "bar_raw",
"sla": "baz_sla",
"hooks": "baz_hooks",
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from rally.common import utils
from rally import exceptions
from rally.plugins.openstack.context import api_versions
from rally.task import context
from tests.unit import test
@ddt.ddt
class OpenStackServicesTestCase(test.TestCase):
def setUp(self):
super(OpenStackServicesTestCase, self).setUp()
self.mock_clients = mock.patch(
"rally.plugins.openstack.osclients.Clients").start()
osclient_kc = self.mock_clients.return_value.keystone
self.mock_kc = osclient_kc.return_value
self.service_catalog = osclient_kc.service_catalog
self.service_catalog.get_endpoints.return_value = []
self.mock_kc.services.list.return_value = []
@ddt.data(({"nova": {"service_type": "compute", "version": 2},
"cinder": {"service_name": "cinderv2", "version": 2},
"neutron": {"service_type": "network"},
"glance": {"service_name": "glance"},
# -*- coding: utf-8 -*-
"""Tests of i18n/dummy.py"""
import ddt
from polib import POEntry
from i18n import dummy
from . import I18nToolTestCase,MOCK_APPLICATION_DIR
@ddt.ddt
class TestDummy(I18nToolTestCase):
"""
Tests functionality of i18n/dummy.py
"""
def setUp(self):
super(TestDummy, self).setUp()
self.converter = dummy.Dummy()
def assertUnicodeEquals(self, str1, str2):
"""Just like assertEquals, but doesn't put Unicode into the fail message.
Either nose, or rake, or something, deals very badly with unusual
Unicode characters in the assertions, so we use repr here to keep
things safe.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from rally.plugins.common.sla import outliers
from rally.task import sla
from tests.unit import test
@ddt.ddt
class OutliersTestCase(test.TestCase):
@ddt.data(({"max": 0, "min_iterations": 5, "sigmas": 2.5}, True),
({"max": -1}, False),
({"max": 0, "min_iterations": 2}, False),
({"max": 0, "sigmas": 0}, False),
({"foo": "bar"}, False))
@ddt.unpack
def test_validate(self, config, valid):
results = sla.SLA.validate("outliers", None, None, config)
if valid:
self.assertEqual([], results)
else:
self.assertEqual(1, len(results))
def test_result(self):
"username": "admin",
"tenant_name": "admin",
"password": "admin-12345",
"auth_url": "http://test:5000/v2.0/",
"permission": "admin",
"region_name": "test",
"https_insecure": False,
"https_cacert": "/path/to/cacert/file",
"user_domain_name": "admin",
"project_domain_name": "admin"
}
PATH = "rally_openstack.verification.tempest.context"
@ddt.ddt
class TempestContextTestCase(test.TestCase):
def setUp(self):
super(TempestContextTestCase, self).setUp()
self.mock_isfile = mock.patch("os.path.isfile",
return_value=True).start()
self.cred = fakes.FakeCredential(**CRED)
self.deployment = fakes.FakeDeployment(
uuid="fake_deployment", admin=self.cred)
cfg = {"verifier": mock.Mock(deployment=self.deployment),
"verification": {"uuid": "uuid"}}
cfg["verifier"].manager.home_dir = "/p/a/t/h"
cfg["verifier"].manager.configfile = "/fake/path/to/config"
self.context = context.TempestContext(cfg)
# under the License.
import ddt
import mock
import os_faults
from os_faults.api import error
from rally import consts
from rally.plugins.openstack.hook import fault_injection
from rally.task import hook
from tests.unit import fakes
from tests.unit import test
@ddt.ddt
class FaultInjectionHookTestCase(test.TestCase):
def setUp(self):
super(FaultInjectionHookTestCase, self).setUp()
self.task = {"deployment_uuid": "foo_uuid"}
@ddt.data((dict(action="foo"), True),
(dict(action="foo", verify=True), True),
(dict(action=10), False),
(dict(action="foo", verify=10), False),
(dict(), False))
@ddt.unpack
def test_config_schema(self, config, valid):
results = hook.HookAction.validate("fault_injection", None, None,
config)
if valid:
def test_fetch_nonexisting_partition_404s(self):
self.simulate_get('/v1/partition/no')
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_patch_nonexisting_partition_404s(self):
doc = {'weight': 1}
self.simulate_patch('/v1/partition/no', body=json.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_404)
def test_bad_json_on_put_raises_bad_request(self):
self.simulate_put('/v1/partitions/bad_json', body="")
self.assertEqual(self.srmock.status, falcon.HTTP_400)
@ddt.ddt
class ExistingPartitionTest(base.TestBase):
@classmethod
def setUpClass(cls):
super(ExistingPartitionTest, cls).setUpClass()
def setUp(self):
super(ExistingPartitionTest, self).setUp()
self.weight = 100
self.hosts = ['a']
self.name = str(uuid.uuid1())
self.partition_uri = '/v1/partitions/' + self.name
doc = {'weight': self.weight, 'hosts': self.hosts}
self.simulate_put(self.partition_uri, body=json.dumps(doc))
self.assertEqual(self.srmock.status, falcon.HTTP_201)
- name : unknown2
type : AES
encrypted: 5942f9a9aff
factors : [3631, 129949621]
error : Failed reconstructing AES-key!
warning : Incomplete factorization ## UNUSED
"""
def read_app_db():
return yaml.load(textwrap.dedent(app_db_txt))
app_db = read_app_db()
@ddt.ddt
class TUnfactor(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.chdir(os.path.dirname(__file__))
@ddt.data(*[k for k in app_db['keys'] if k['type'] == 'AES'])
def test_unfactor_from_file(self, key_rec):
for f in key_rec.get('crypted_files', ()):
exp_aes_key = key_rec.get('decrypted')
if not exp_aes_key:
continue
factors = [int(fc) for fc in key_rec['factors']]
aes_keys = unfactor.unfactor_key_from_file(f, factors)
#print(key_rec['name'], f, aes_keys, exp_aes_key)
self.assertIn(exp_aes_key, aes_keys,
(key_rec['name'], f, aes_keys, exp_aes_key))
"username": "admin",
"tenant_name": "admin",
"password": "admin-12345",
"auth_url": "http://test:5000/v2.0/",
"permission": "admin",
"region_name": "test",
"https_insecure": False,
"https_cacert": "/path/to/cacert/file",
"user_domain_name": "admin",
"project_domain_name": "admin"
}
PATH = "rally.plugins.openstack.verification.tempest.config"
@ddt.ddt
class TempestConfigfileManagerTestCase(test.TestCase):
def setUp(self):
super(TempestConfigfileManagerTestCase, self).setUp()
deployment = fakes.FakeDeployment(uuid="fake_deployment",
admin=fakes.fake_credential(**CRED))
self.tempest = config.TempestConfigfileManager(deployment)
def test__configure_auth(self):
self.tempest.conf.add_section("auth")
self.tempest._configure_auth()
expected = (
("admin_username", CRED["username"]),
("admin_password", CRED["password"]),
("admin_project_name", CRED["tenant_name"]),